From be4aeaec24d12c0af19d8497dafcb6c60de0dfba Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Sun, 12 Sep 2021 00:54:25 +0800 Subject: [PATCH 01/14] [HUDI-2413] fix Sql source's checkpoint --- .../hudi/utilities/sources/SqlSource.java | 3 ++- .../hudi/utilities/sources/TestSqlSource.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java index d832e43d2ae0b..12f32e4391e20 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.log4j.LogManager; @@ -81,7 +82,7 @@ protected Pair>, String> fetchNextBatch( .filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) .toArray(String[]::new)); } - return Pair.of(Option.of(source), null); + return Pair.of(Option.of(source), StringUtils.EMPTY_STRING); } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java index 9c3d5584a5dd7..9157beb6808ce 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java @@ -135,6 +135,23 @@ public void testSqlSourceRowFormat() throws IOException { assertEquals(10000, fetch1AsRows.getBatch().get().count()); } + /** + * Runs the test scenario of reading data from the source in row format. + * Source has no records. + * + * @throws IOException + */ + @Test + public void testSqlSourceCheckpoint() throws IOException { + props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0"); + sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider); + sourceFormatAdapter = new SourceFormatAdapter(sqlSource); + + InputBatch> fetch1AsRows = + sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + assertEquals("", fetch1AsRows.getCheckpointForNextBatch()); + } + /** * Runs the test scenario of reading data from the source in row format. * Source has more records than source limit. From 02d8da925ca90b99ef2b6c3351d3ba5d3c32177e Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Sun, 12 Sep 2021 01:21:53 +0800 Subject: [PATCH 02/14] Revert "[HUDI-2413] fix Sql source's checkpoint" This reverts commit be4aeaec --- .../hudi/utilities/sources/SqlSource.java | 3 +-- .../hudi/utilities/sources/TestSqlSource.java | 17 ----------------- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java index 12f32e4391e20..d832e43d2ae0b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SqlSource.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.log4j.LogManager; @@ -82,7 +81,7 @@ protected Pair>, String> fetchNextBatch( .filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) .toArray(String[]::new)); } - return Pair.of(Option.of(source), StringUtils.EMPTY_STRING); + return Pair.of(Option.of(source), null); } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java index 9157beb6808ce..9c3d5584a5dd7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java @@ -135,23 +135,6 @@ public void testSqlSourceRowFormat() throws IOException { assertEquals(10000, fetch1AsRows.getBatch().get().count()); } - /** - * Runs the test scenario of reading data from the source in row format. - * Source has no records. - * - * @throws IOException - */ - @Test - public void testSqlSourceCheckpoint() throws IOException { - props.setProperty(sqlSourceConfig, "select * from test_sql_table where 1=0"); - sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider); - sourceFormatAdapter = new SourceFormatAdapter(sqlSource); - - InputBatch> fetch1AsRows = - sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); - assertEquals("", fetch1AsRows.getCheckpointForNextBatch()); - } - /** * Runs the test scenario of reading data from the source in row format. * Source has more records than source limit. From b12f205de8b1d4483d05cd54db43f71352ccb493 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Sun, 10 Jul 2022 01:25:28 +0800 Subject: [PATCH 03/14] [HUDI-4065] FileBasedLockProvider --- .../lock/FileBasedLockProvider.java | 159 ++++++++++++++++++ .../apache/hudi/config/HoodieLockConfig.java | 7 + .../TestFileBasedLockProvider.java | 103 ++++++++++++ .../hudi/common/config/LockConfiguration.java | 2 + 4 files changed, 271 insertions(+) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java new file mode 100644 index 0000000000000..d0ae5030fcd27 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; + +/** + * A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations + * using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again. + * NOTE: This only works for DFS with atomic create/delete operation + */ +public class FileBasedLockProvider implements LockProvider, Serializable { + + private static final Logger LOG = LogManager.getLogger(FileBasedLockProvider.class); + + private static final String LOCK = "lock"; + + private final int retryMaxCount; + private final int retryWaitTimeMs; + private final int lockTimeoutMinutes; + private transient FileSystem fs; + private transient Path lockFile; + protected LockConfiguration lockConfiguration; + + public FileBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { + checkRequiredProps(lockConfiguration); + this.lockConfiguration = lockConfiguration; + final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); + this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); + this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY); + this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY); + this.lockFile = new Path(lockDirectory + "/" + LOCK); + this.fs = FSUtils.getFs(this.lockFile.toString(), configuration); + } + + @Override + public void close() { + synchronized (LOCK) { + try { + fs.delete(this.lockFile, true); + } catch (IOException e) { + throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e); + } + } + } + + @Override + public boolean tryLock(long time, TimeUnit unit) { + try { + int numRetries = 0; + synchronized (LOCK) { + while (fs.exists(this.lockFile)) { + LOCK.wait(retryWaitTimeMs); + numRetries++; + if (numRetries > retryMaxCount) { + return false; + } + // Check whether lock is already expired or not, if so try to delete lock file + if (lockTimeoutMinutes != 0 && checkIfExpired()) { + if (fs.delete(this.lockFile, true)) { + break; + } + } + } + acquireLock(); + return fs.exists(this.lockFile); + } + } catch (IOException | InterruptedException e) { + throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); + } + } + + @Override + public void unlock() { + synchronized (LOCK) { + try { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + } catch (IOException io) { + throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io); + } + } + } + + @Override + public String getLock() { + return this.lockFile.toString(); + } + + private boolean checkIfExpired() { + try { + long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime(); + if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) { + return true; + } + } catch (IOException e) { + LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e); + } + return false; + } + + private void acquireLock() { + try { + fs.create(this.lockFile, false).close(); + } catch (IOException e) { + throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); + } + } + + protected String generateLogStatement(LockState state) { + return StringUtils.join(state.name(), " lock at", getLock()); + } + + private void checkRequiredProps(final LockConfiguration config) { + ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY) != null); + ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY) > 0); + ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY) > 0); + ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 9ea28fbbd42e7..10a9afbe751b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -36,6 +36,7 @@ import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY; @@ -108,6 +109,12 @@ public class HoodieLockConfig extends HoodieConfig { .sinceVersion("0.8.0") .withDocumentation("For DFS based lock providers, path to store the locks under."); + public static final ConfigProperty FILESYSTEM_LOCK_EXPIRE = ConfigProperty + .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) + .defaultValue(0) + .sinceVersion("0.12.0") + .withDocumentation("For DFS based lock providers, expire time in minutes"); + public static final ConfigProperty HIVE_DATABASE_NAME = ConfigProperty .key(HIVE_DATABASE_NAME_PROP_KEY) .noDefaultValue() diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java new file mode 100644 index 0000000000000..81915304bd141 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.transaction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hudi.client.transaction.lock.FileBasedLockProvider; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; + +public class TestFileBasedLockProvider { + private static final Logger LOG = LogManager.getLogger(TestFileBasedLockProvider.class); + private static HdfsTestService hdfsTestService; + private static MiniDFSCluster dfsCluster; + private static LockConfiguration lockConfiguration; + private static Configuration hadoopConf; + + @BeforeAll + public static void setup() throws IOException { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + hadoopConf = dfsCluster.getFileSystem().getConf(); + + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); + properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1"); + properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); + lockConfiguration = new LockConfiguration(properties); + } + + @Test + public void testAcquireLock() { + FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + } + + @Test + public void testUnLock() { + FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS); + } + + @Test + public void testReentrantLock() { + FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + try { + fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS); + Assertions.fail(); + } catch (HoodieLockException e) { + // expected + } + fileBasedLockProvider.unlock(); + } + + @Test + public void testUnlockWithoutLock() { + FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + fileBasedLockProvider.unlock(); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java index 0109f22097a31..c6ebc54e95d78 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java @@ -48,6 +48,8 @@ public class LockConfiguration implements Serializable { public static final String FILESYSTEM_LOCK_PATH_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path"; + public static final String FILESYSTEM_LOCK_EXPIRE_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "expire"; + // configs for metastore based locks public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore."; From b57aab2953e7a1815096d0cacfd714ba94a411ab Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Wed, 13 Jul 2022 03:28:39 +0800 Subject: [PATCH 04/14] [HUDI-4065] FileBasedLockProvider review comments --- ....java => FileSystemBasedLockProvider.java} | 20 ++++---- .../apache/hudi/config/HoodieLockConfig.java | 2 +- .../client}/TestFileBasedLockProvider.java | 46 +++++++++++++++---- 3 files changed, 48 insertions(+), 20 deletions(-) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/{FileBasedLockProvider.java => FileSystemBasedLockProvider.java} (89%) rename hudi-client/{hudi-client-common/src/test/java/org/apache/hudi/client/transaction => hudi-spark-client/src/test/java/org/apache/hudi/client}/TestFileBasedLockProvider.java (67%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java similarity index 89% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index d0ae5030fcd27..030de192befea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -47,11 +47,11 @@ * using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again. * NOTE: This only works for DFS with atomic create/delete operation */ -public class FileBasedLockProvider implements LockProvider, Serializable { +public class FileSystemBasedLockProvider implements LockProvider, Serializable { - private static final Logger LOG = LogManager.getLogger(FileBasedLockProvider.class); + private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class); - private static final String LOCK = "lock"; + private static final String LOCK_FILE_NAME = "lock"; private final int retryMaxCount; private final int retryWaitTimeMs; @@ -60,20 +60,20 @@ public class FileBasedLockProvider implements LockProvider, Serializable private transient Path lockFile; protected LockConfiguration lockConfiguration; - public FileBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { + public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY); this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY); - this.lockFile = new Path(lockDirectory + "/" + LOCK); + this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME); this.fs = FSUtils.getFs(this.lockFile.toString(), configuration); } @Override public void close() { - synchronized (LOCK) { + synchronized (LOCK_FILE_NAME) { try { fs.delete(this.lockFile, true); } catch (IOException e) { @@ -86,9 +86,9 @@ public void close() { public boolean tryLock(long time, TimeUnit unit) { try { int numRetries = 0; - synchronized (LOCK) { + synchronized (LOCK_FILE_NAME) { while (fs.exists(this.lockFile)) { - LOCK.wait(retryWaitTimeMs); + LOCK_FILE_NAME.wait(retryWaitTimeMs); numRetries++; if (numRetries > retryMaxCount) { return false; @@ -110,7 +110,7 @@ public boolean tryLock(long time, TimeUnit unit) { @Override public void unlock() { - synchronized (LOCK) { + synchronized (LOCK_FILE_NAME) { try { if (fs.exists(this.lockFile)) { fs.delete(this.lockFile, true); @@ -147,7 +147,7 @@ private void acquireLock() { } protected String generateLogStatement(LockState state) { - return StringUtils.join(state.name(), " lock at", getLock()); + return StringUtils.join(state.name(), " lock at: ", getLock()); } private void checkRequiredProps(final LockConfiguration config) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 10a9afbe751b0..c72824989938f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -113,7 +113,7 @@ public class HoodieLockConfig extends HoodieConfig { .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) .defaultValue(0) .sinceVersion("0.12.0") - .withDocumentation("For DFS based lock providers, expire time in minutes"); + .withDocumentation("For DFS based lock providers, expire time in minutes, must be a nonnegative number, default means no expire"); public static final ConfigProperty HIVE_DATABASE_NAME = ConfigProperty .key(HIVE_DATABASE_NAME_PROP_KEY) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java similarity index 67% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 81915304bd141..bc5fc7f05aeba 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -17,16 +17,19 @@ * under the License. */ -package org.apache.hudi.client.transaction; +package org.apache.hudi.client; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hudi.client.transaction.lock.FileBasedLockProvider; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; import org.apache.hudi.exception.HoodieLockException; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -38,10 +41,10 @@ import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; public class TestFileBasedLockProvider { - private static final Logger LOG = LogManager.getLogger(TestFileBasedLockProvider.class); private static HdfsTestService hdfsTestService; private static MiniDFSCluster dfsCluster; private static LockConfiguration lockConfiguration; @@ -57,13 +60,38 @@ public static void setup() throws IOException { properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1"); properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000"); properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); lockConfiguration = new LockConfiguration(properties); } + @AfterAll + public static void cleanUpAfterAll() throws IOException { + Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); + FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); + FileStatus[] fileStatuses = dfsCluster.getFileSystem().listStatus(workDir); + for (FileStatus f : fileStatuses) { + fs.delete(f.getPath(), true); + } + if (hdfsTestService != null) { + hdfsTestService.stop(); + hdfsTestService = null; + } + } + + @AfterEach + public void cleanUpAfterEach() throws IOException { + Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); + FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); + FileStatus[] fileStatuses = dfsCluster.getFileSystem().listStatus(workDir); + for (FileStatus f : fileStatuses) { + fs.delete(f.getPath(), true); + } + } + @Test public void testAcquireLock() { - FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); @@ -71,7 +99,7 @@ public void testAcquireLock() { @Test public void testUnLock() { - FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); @@ -81,7 +109,7 @@ public void testUnLock() { @Test public void testReentrantLock() { - FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); try { @@ -96,7 +124,7 @@ public void testReentrantLock() { @Test public void testUnlockWithoutLock() { - FileBasedLockProvider fileBasedLockProvider = new FileBasedLockProvider(lockConfiguration, hadoopConf); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); fileBasedLockProvider.unlock(); } From 63568d74644e5ac867f677f68726ad7775896390 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Wed, 13 Jul 2022 23:47:52 +0800 Subject: [PATCH 05/14] [HUDI-4065] minor update --- .../apache/hudi/client/TestFileBasedLockProvider.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index bc5fc7f05aeba..20e150e026266 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -124,8 +124,13 @@ public void testReentrantLock() { @Test public void testUnlockWithoutLock() { - FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); - fileBasedLockProvider.unlock(); + try{ + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + fileBasedLockProvider.unlock(); + } + catch (HoodieLockException e) { + Assertions.fail(); + } } } From cae68aeca546c73e9454bb200210d16fdbe6dc7e Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 14 Jul 2022 00:07:03 +0800 Subject: [PATCH 06/14] [HUDI-4065] minor update --- .../org/apache/hudi/client/TestFileBasedLockProvider.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 20e150e026266..276d922ae0b2b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -124,11 +124,10 @@ public void testReentrantLock() { @Test public void testUnlockWithoutLock() { - try{ + try { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); fileBasedLockProvider.unlock(); - } - catch (HoodieLockException e) { + } catch (HoodieLockException e) { Assertions.fail(); } } From 4dca5abbeb3dacc65e41ec3c830d6093b3a1b586 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 14 Jul 2022 23:07:09 +0800 Subject: [PATCH 07/14] [HUDI-4065] add multiple writers ut for file lock --- .../lock/FileSystemBasedLockProvider.java | 9 +- .../apache/hudi/config/HoodieLockConfig.java | 2 +- .../client/TestFileBasedLockProvider.java | 11 +++ .../client/TestHoodieClientMultiWriter.java | 87 +++++++++++-------- 4 files changed, 71 insertions(+), 38 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index 030de192befea..04cfe316b4777 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.lock.LockState; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieLockException; import org.apache.log4j.LogManager; @@ -63,7 +64,10 @@ public class FileSystemBasedLockProvider implements LockProvider, Serial public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; - final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); + String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); + if (StringUtils.isNullOrEmpty(lockDirectory)) { + lockDirectory = this.lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key()); + } this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY); this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY); @@ -151,7 +155,8 @@ protected String generateLogStatement(LockState state) { } private void checkRequiredProps(final LockConfiguration config) { - ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY) != null); + ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY) != null + || this.lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key()) != null); ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY) > 0); ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY) > 0); ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index c72824989938f..f888aefd4577b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -107,7 +107,7 @@ public class HoodieLockConfig extends HoodieConfig { .key(FILESYSTEM_LOCK_PATH_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("For DFS based lock providers, path to store the locks under."); + .withDocumentation("For DFS based lock providers, path to store the locks under. If don't provide one, will use Table's meta path as default"); public static final ConfigProperty FILESYSTEM_LOCK_EXPIRE = ConfigProperty .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 276d922ae0b2b..8c371426cc162 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -27,6 +27,7 @@ import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieLockException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -97,6 +98,16 @@ public void testAcquireLock() { fileBasedLockProvider.unlock(); } + @Test + public void testAcquireLockWithDefaultPath() { + lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY); + lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/"); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + } + @Test public void testUnLock() { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 3aeca0f275891..96979ae99e349 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -43,9 +44,11 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.util.ArrayList; @@ -64,7 +67,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -74,6 +81,21 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { + private Properties lockProperties = null; + + @BeforeEach + public void setup() throws IOException { + if (lockProperties == null) { + lockProperties = new Properties(); + lockProperties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + lockProperties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1"); + lockProperties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + lockProperties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000"); + lockProperties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); + } + } + public void setUpMORTestTable() throws IOException { cleanupResources(); initPath(); @@ -90,15 +112,27 @@ public void clean() throws IOException { cleanupResources(); } + private static final List LOCK_PROVIDER_CLASSES = Arrays.asList( + InProcessLockProvider.class, + FileSystemBasedLockProvider.class); + + private static Iterable providerClassAndTableType() { + List opts = new ArrayList<>(); + for (Object providerClass : LOCK_PROVIDER_CLASSES) { + opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass}); + opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass}); + } + return opts; + } + @ParameterizedTest - @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) - public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception { + @MethodSource("providerClassAndTableType") + public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception { if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + HoodieWriteConfig writeConfig = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) @@ -106,8 +140,8 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) - .build()).withAutoCommit(false).withProperties(properties).build(); + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass) + .build()).withAutoCommit(false).withProperties(lockProperties).build(); // Create the first commit createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true); @@ -168,16 +202,6 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E assertTrue(writer1Completed.get() && writer2Completed.get()); } - @Test - public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception { - testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE); - } - - @Test - public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Exception { - testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ); - } - @ParameterizedTest @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception { @@ -185,11 +209,9 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table setUpMORTestTable(); } - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20"); HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -204,7 +226,7 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table .withAutoCommit(false) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) - .withProperties(properties) + .withProperties(lockProperties) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST) .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) .build(); @@ -254,15 +276,13 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) { } } - private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception { + @ParameterizedTest + @MethodSource("providerClassAndTableType") + public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass) throws Exception { // create inserts X 1 if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); // Disabling embedded timeline server, it doesn't work with multiwriter HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false) @@ -275,8 +295,8 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( FileSystemViewStorageType.MEMORY).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) - .build()).withAutoCommit(false).withProperties(properties); + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass) + .build()).withAutoCommit(false).withProperties(lockProperties); Set validInstants = new HashSet<>(); // Create the first commit with inserts HoodieWriteConfig cfg = writeConfigBuilder.build(); @@ -448,10 +468,7 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) @Test public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception { - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100"); HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withAutoClean(false).build()) @@ -459,7 +476,7 @@ public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) - .build()).withAutoCommit(true).withProperties(properties); + .build()).withAutoCommit(true).withProperties(lockProperties); HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieWriteConfig cfg2 = writeConfigBuilder.build(); From 47669516e1039019e1b7fa878813b9a88b5b80ae Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Thu, 14 Jul 2022 23:51:26 +0800 Subject: [PATCH 08/14] [HUDI-4065] delete the retry logic in FileSystemBasedLockProvider since duplicate with LockManager --- .../lock/FileSystemBasedLockProvider.java | 17 +++++------------ .../hudi/client/TestFileBasedLockProvider.java | 1 + 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index 04cfe316b4777..4c1748e132a90 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -89,26 +89,19 @@ public void close() { @Override public boolean tryLock(long time, TimeUnit unit) { try { - int numRetries = 0; synchronized (LOCK_FILE_NAME) { - while (fs.exists(this.lockFile)) { - LOCK_FILE_NAME.wait(retryWaitTimeMs); - numRetries++; - if (numRetries > retryMaxCount) { - return false; - } + if (fs.exists(this.lockFile)) { // Check whether lock is already expired or not, if so try to delete lock file if (lockTimeoutMinutes != 0 && checkIfExpired()) { - if (fs.delete(this.lockFile, true)) { - break; - } + fs.delete(this.lockFile, true); } } acquireLock(); return fs.exists(this.lockFile); } - } catch (IOException | InterruptedException e) { - throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); + } catch (IOException e) { + LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); + return false; } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 8c371426cc162..2090e3b351c98 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -106,6 +106,7 @@ public void testAcquireLockWithDefaultPath() { Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); + lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); } @Test From d5254f4cd5b470e5311b007b8151f660a0276373 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 15 Jul 2022 00:26:52 +0800 Subject: [PATCH 09/14] [HUDI-4065] fix ut --- .../lock/FileSystemBasedLockProvider.java | 12 ++++++------ .../hudi/client/TestFileBasedLockProvider.java | 14 +++----------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index 4c1748e132a90..c1e9d185d996d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -64,9 +64,9 @@ public class FileSystemBasedLockProvider implements LockProvider, Serial public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; - String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); + String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null); if (StringUtils.isNullOrEmpty(lockDirectory)) { - lockDirectory = this.lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key()); + lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null); } this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY); @@ -99,7 +99,7 @@ public boolean tryLock(long time, TimeUnit unit) { acquireLock(); return fs.exists(this.lockFile); } - } catch (IOException e) { + } catch (IOException | HoodieIOException e) { LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); return false; } @@ -129,7 +129,7 @@ private boolean checkIfExpired() { if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) { return true; } - } catch (IOException e) { + } catch (IOException | HoodieIOException e) { LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e); } return false; @@ -148,8 +148,8 @@ protected String generateLogStatement(LockState state) { } private void checkRequiredProps(final LockConfiguration config) { - ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY) != null - || this.lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key()) != null); + ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null + || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null); ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY) > 0); ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY) > 0); ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 2090e3b351c98..231d498514300 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -84,10 +84,7 @@ public static void cleanUpAfterAll() throws IOException { public void cleanUpAfterEach() throws IOException { Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); - FileStatus[] fileStatuses = dfsCluster.getFileSystem().listStatus(workDir); - for (FileStatus f : fileStatuses) { - fs.delete(f.getPath(), true); - } + fs.delete(new Path("/tmp/lock"), true); } @Test @@ -124,13 +121,8 @@ public void testReentrantLock() { FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); - try { - fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS); - Assertions.fail(); - } catch (HoodieLockException e) { - // expected - } + Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); } From 68c8625014965abcd82597b4defdbbe1b3f52d7e Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 15 Jul 2022 00:28:32 +0800 Subject: [PATCH 10/14] [HUDI-4065] fix ut --- .../org/apache/hudi/client/TestFileBasedLockProvider.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 231d498514300..b3b346cd5e19d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -70,10 +70,7 @@ public static void setup() throws IOException { public static void cleanUpAfterAll() throws IOException { Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); - FileStatus[] fileStatuses = dfsCluster.getFileSystem().listStatus(workDir); - for (FileStatus f : fileStatuses) { - fs.delete(f.getPath(), true); - } + fs.delete(new Path("/tmp/lock"), true); if (hdfsTestService != null) { hdfsTestService.stop(); hdfsTestService = null; From 8fce774d7b16d44948ad96ae4098d66fde6bb1cc Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 15 Jul 2022 00:29:58 +0800 Subject: [PATCH 11/14] [HUDI-4065] fix ut --- .../java/org/apache/hudi/client/TestFileBasedLockProvider.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index b3b346cd5e19d..21aab85ed3bb7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -20,7 +20,6 @@ package org.apache.hudi.client; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; From ef589521e7635f9eb2d015b4c5e9b19d95710528 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Fri, 15 Jul 2022 22:50:47 +0800 Subject: [PATCH 12/14] [HUDI-4065] use meta folder as default --- .../lock/FileSystemBasedLockProvider.java | 12 +++--------- .../hudi/client/TestFileBasedLockProvider.java | 1 + 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index c1e9d185d996d..6cf72698c9c9e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -40,8 +41,6 @@ import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; -import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; -import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; /** * A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations @@ -54,8 +53,6 @@ public class FileSystemBasedLockProvider implements LockProvider, Serial private static final String LOCK_FILE_NAME = "lock"; - private final int retryMaxCount; - private final int retryWaitTimeMs; private final int lockTimeoutMinutes; private transient FileSystem fs; private transient Path lockFile; @@ -66,10 +63,9 @@ public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, fi this.lockConfiguration = lockConfiguration; String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null); if (StringUtils.isNullOrEmpty(lockDirectory)) { - lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null); + lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) + + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; } - this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); - this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY); this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY); this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME); this.fs = FSUtils.getFs(this.lockFile.toString(), configuration); @@ -150,8 +146,6 @@ protected String generateLogStatement(LockState state) { private void checkRequiredProps(final LockConfiguration config) { ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null); - ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY) > 0); - ValidationUtils.checkArgument(config.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY) > 0); ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index 21aab85ed3bb7..f9074daf4f0c5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -70,6 +70,7 @@ public static void cleanUpAfterAll() throws IOException { Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); fs.delete(new Path("/tmp/lock"), true); + fs.delete(new Path("/tmp/.hoodie/lock"), true); if (hdfsTestService != null) { hdfsTestService.stop(); hdfsTestService = null; From 5fa8809f397422bbed089749b6ee75de0f9ad9d4 Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Sat, 16 Jul 2022 22:06:41 +0800 Subject: [PATCH 13/14] [HUDI-4065] minor update --- .../transaction/lock/FileSystemBasedLockProvider.java | 5 ++++- .../org/apache/hudi/client/TestFileBasedLockProvider.java | 7 +++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index 6cf72698c9c9e..264ed8b3d8110 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -88,7 +88,7 @@ public boolean tryLock(long time, TimeUnit unit) { synchronized (LOCK_FILE_NAME) { if (fs.exists(this.lockFile)) { // Check whether lock is already expired or not, if so try to delete lock file - if (lockTimeoutMinutes != 0 && checkIfExpired()) { + if (checkIfExpired()) { fs.delete(this.lockFile, true); } } @@ -120,6 +120,9 @@ public String getLock() { } private boolean checkIfExpired() { + if (lockTimeoutMinutes == 0) { + return false; + } try { long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime(); if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java index f9074daf4f0c5..208e9cd62e738 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -69,8 +69,7 @@ public static void setup() throws IOException { public static void cleanUpAfterAll() throws IOException { Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); - fs.delete(new Path("/tmp/lock"), true); - fs.delete(new Path("/tmp/.hoodie/lock"), true); + fs.delete(new Path("/tmp"), true); if (hdfsTestService != null) { hdfsTestService.stop(); hdfsTestService = null; @@ -109,8 +108,8 @@ public void testUnLock() { Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); fileBasedLockProvider.unlock(); - fileBasedLockProvider.tryLock(lockConfiguration.getConfig() - .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); } @Test From be5e2b1f70d9a99aae37e2e050121bccc2465f2c Mon Sep 17 00:00:00 2001 From: "jian.feng" Date: Mon, 18 Jul 2022 20:57:44 +0800 Subject: [PATCH 14/14] [HUDI-4065] minor update --- .../transaction/lock/FileSystemBasedLockProvider.java | 10 ++++------ .../java/org/apache/hudi/config/HoodieLockConfig.java | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java index 264ed8b3d8110..96a42e8409b29 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -63,7 +63,7 @@ public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, fi this.lockConfiguration = lockConfiguration; String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null); if (StringUtils.isNullOrEmpty(lockDirectory)) { - lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) + lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key()) + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; } this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY); @@ -86,11 +86,9 @@ public void close() { public boolean tryLock(long time, TimeUnit unit) { try { synchronized (LOCK_FILE_NAME) { - if (fs.exists(this.lockFile)) { - // Check whether lock is already expired or not, if so try to delete lock file - if (checkIfExpired()) { - fs.delete(this.lockFile, true); - } + // Check whether lock is already expired, if so try to delete lock file + if (fs.exists(this.lockFile) && checkIfExpired()) { + fs.delete(this.lockFile, true); } acquireLock(); return fs.exists(this.lockFile); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index f888aefd4577b..7fcc96810be2c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -107,7 +107,7 @@ public class HoodieLockConfig extends HoodieConfig { .key(FILESYSTEM_LOCK_PATH_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("For DFS based lock providers, path to store the locks under. If don't provide one, will use Table's meta path as default"); + .withDocumentation("For DFS based lock providers, path to store the locks under. use Table's meta path as default"); public static final ConfigProperty FILESYSTEM_LOCK_EXPIRE = ConfigProperty .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY)