diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java new file mode 100644 index 0000000000000..00f9bcd747427 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.HeartbeatManager; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager; +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StorageSchemes; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS; +import static org.apache.hudi.common.lock.LockState.ACQUIRED; +import static org.apache.hudi.common.lock.LockState.ACQUIRING; +import static org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE; +import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE; +import static org.apache.hudi.common.lock.LockState.RELEASED; +import static org.apache.hudi.common.lock.LockState.RELEASING; +import static org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME; + +/** + * A distributed filesystem storage based lock provider. This {@link LockProvider} implementation + * leverages conditional writes to ensure transactional consistency for multi-writer scenarios. + * The underlying storage client interface {@link StorageLockClient} is pluggable so it can be implemented for any + * filesystem which supports conditional writes. + */ +@ThreadSafe +public class StorageBasedLockProvider implements LockProvider { + + public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock.json"; + // How long to wait before retrying lock acquisition in blocking calls. + // Maximum expected clock drift between two nodes. + // This is similar idea as SkewAdjustingTimeGenerator. + // In reality, within a single cloud provider all nodes share the same ntp + // server + // therefore we do not expect drift more than a few ms. + // However, since our lock leases are pretty long, we can use a high buffer. + private static final long CLOCK_DRIFT_BUFFER_MS = 500; + + private static final Logger LOGGER = LoggerFactory.getLogger(StorageBasedLockProvider.class); + + // Use for testing + private final Logger logger; + + // The lock service implementation which interacts with storage + private final StorageLockClient storageLockClient; + + private final long lockValiditySecs; + private final String ownerId; + private final String lockFilePath; + private final HeartbeatManager heartbeatManager; + private final transient Thread shutdownThread; + + @GuardedBy("this") + private StorageLockFile currentLockObj = null; + @GuardedBy("this") + private boolean isClosed = false; + + private synchronized void setLock(StorageLockFile lockObj) { + if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) { + throw new HoodieLockException("Owners do not match. Current lock owner: " + this.ownerId + " lock path: " + + this.lockFilePath + " owner: " + lockObj.getOwner()); + } + this.currentLockObj = lockObj; + } + + /** + * Default constructor for StorageBasedLockProvider, required by LockManager + * to instantiate it using reflection. + * + * @param lockConfiguration The lock configuration, should be transformable into + * StorageBasedLockConfig + * @param conf Storage config, ignored. + */ + public StorageBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration conf) { + this( + UUID.randomUUID().toString(), + lockConfiguration.getConfig(), + LockProviderHeartbeatManager::new, + getStorageLockClientClassName(), + LOGGER); + } + + private static Functions.Function3 getStorageLockClientClassName() { + return (ownerId, lockFilePath, lockConfig) -> { + try { + return (StorageLockClient) ReflectionUtils.loadClass( + getLockServiceClassName(new URI(lockFilePath).getScheme()), + new Class[]{String.class, String.class, Properties.class}, + new Object[]{ownerId, lockFilePath, lockConfig}); + } catch (Throwable e) { + throw new HoodieLockException("Failed to load and initialize StorageLock", e); + } + }; + } + + private static @NotNull String getLockServiceClassName(String scheme) { + Option schemeOptional = StorageSchemes.getStorageLockImplementationIfExists(scheme); + if (schemeOptional.isPresent()) { + return schemeOptional.get().getStorageLockClass(); + } else { + throw new HoodieNotSupportedException("No implementation of StorageLock supports this scheme: " + scheme); + } + } + + @VisibleForTesting + StorageBasedLockProvider( + String ownerId, + TypedProperties properties, + Functions.Function3, HeartbeatManager> heartbeatManagerLoader, + Functions.Function3 storageLockClientLoader, + Logger logger) { + StorageBasedLockConfig config = new StorageBasedLockConfig.Builder().fromProperties(properties).build(); + long heartbeatPollSeconds = config.getHeartbeatPollSeconds(); + this.lockValiditySecs = config.getValiditySeconds(); + this.lockFilePath = String.format( + "%s%s%s%s%s", + config.getHudiTableBasePath(), + StoragePath.SEPARATOR, + LOCKS_FOLDER_NAME, + StoragePath.SEPARATOR, + DEFAULT_TABLE_LOCK_FILE_NAME); + this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, TimeUnit.SECONDS.toMillis(heartbeatPollSeconds), this::renewLock); + this.storageLockClient = storageLockClientLoader.apply(ownerId, lockFilePath, properties); + this.ownerId = ownerId; + this.logger = logger; + shutdownThread = new Thread(() -> shutdown(true)); + Runtime.getRuntime().addShutdownHook(shutdownThread); + logger.info("Instantiated new storage-based lock provider, owner: {}, lockfilePath: {}", ownerId, lockFilePath); + } + + // ----------------------------------------- + // BASE METHODS + // ----------------------------------------- + + @Override + public synchronized StorageLockFile getLock() { + return currentLockObj; + } + + /** + * Attempts to acquire the lock within the given timeout. + */ + @Override + public boolean tryLock(long time, TimeUnit unit) { + long deadlineNanos = System.nanoTime() + unit.toNanos(time); + + while (System.nanoTime() < deadlineNanos) { + try { + logDebugLockState(ACQUIRING); + if (tryLock()) { + return true; + } + Thread.sleep(Long.parseLong(DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new HoodieLockException(generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e); + } + } + + return false; + } + + @Override + public synchronized void close() { + shutdown(false); + } + + @VisibleForTesting + synchronized void shutdown(boolean fromShutdownHook) { + if (fromShutdownHook) { + // Try to expire the lock from the shutdown hook. + if (!isClosed && actuallyHoldsLock()) { + tryExpireCurrentLock(true); + } + // Do not execute any further actions, mark closed + this.isClosed = true; + return; + } else { + try { + tryRemoveShutdownHook(); + } catch (IllegalStateException e) { + logger.warn("Owner {}: Failed to remove shutdown hook, JVM is already shutting down.", ownerId, e); + } + } + try { + this.unlock(); + } catch (Exception e) { + logger.error("Owner {}: Failed to unlock current lock.", ownerId, e); + } + try { + this.storageLockClient.close(); + } catch (Exception e) { + logger.error("Owner {}: Lock service failed to close.", ownerId, e); + } + try { + this.heartbeatManager.close(); + } catch (Exception e) { + logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e); + } + + this.isClosed = true; + } + + @VisibleForTesting + void tryRemoveShutdownHook() { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + + private synchronized boolean isLockStillValid(StorageLockFile lock) { + return !lock.isExpired() && !isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntilMs()); + } + + /** + * Attempts a single pass to acquire the lock (non-blocking). + * + * @return true if lock acquired, false otherwise + */ + @Override + public synchronized boolean tryLock() { + assertHeartbeatManagerExists(); + assertUnclosed(); + logDebugLockState(ACQUIRING); + if (actuallyHoldsLock()) { + // Supports reentrant locks + return true; + } + + if (this.heartbeatManager.hasActiveHeartbeat()) { + logger.error("Detected broken invariant: there is an active heartbeat without a lock being held."); + // Breach of object invariant - we should never have an active heartbeat without + // holding a lock. + throw new HoodieLockException(generateLockStateMessage(FAILED_TO_ACQUIRE)); + } + + Pair> latestLock = this.storageLockClient.readCurrentLockFile(); + if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) { + logInfoLockState(FAILED_TO_ACQUIRE, "Failed to get the latest lock status"); + // We were not able to determine whether a lock was present. + return false; + } + + if (latestLock.getLeft() == LockGetResult.SUCCESS && isLockStillValid(latestLock.getRight().get())) { + String msg = String.format("Lock already held by %s", latestLock.getRight().get().getOwner()); + // Lock held by others. + logInfoLockState(FAILED_TO_ACQUIRE, msg); + return false; + } + + // Try to acquire the lock + StorageLockData newLockData = new StorageLockData(false, getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId); + Pair> lockUpdateStatus = this.storageLockClient.tryUpsertLockFile( + newLockData, + latestLock.getRight()); + if (lockUpdateStatus.getLeft() != LockUpsertResult.SUCCESS) { + // failed to acquire the lock, indicates concurrent contention + logInfoLockState(FAILED_TO_ACQUIRE); + return false; + } + this.setLock(lockUpdateStatus.getRight().get()); + + // There is a remote chance that + // - after lock is acquired but before heartbeat starts the lock is expired. + // - lock is acquired and heartbeat is up yet it does not run timely before the + // lock is expired + // It is mitigated by setting the lock validity period to a reasonably long + // period to survive until heartbeat comes, plus + // set the heartbeat interval relatively small enough. + if (!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) { + // Precondition "no active heartbeat" is checked previously, so when + // startHeartbeatForThread returns false, + // we are confident no heartbeat thread is running. + logErrorLockState(RELEASING, "We were unable to start the heartbeat!"); + tryExpireCurrentLock(false); + return false; + } + + logInfoLockState(ACQUIRED); + return true; + } + + /** + * Determines whether this provider currently holds a valid lock. + * + *

+ * This method checks both the existence of a lock object and its validity. A + * lock is considered + * valid only if it exists and has not expired according to its timestamp. + * + * @return {@code true} if this provider holds a valid lock, {@code false} + * otherwise + */ + private boolean actuallyHoldsLock() { + return believesLockMightBeHeld() && isLockStillValid(getLock()); + } + + /** + * Checks if this provider has a non-null lock object reference. + * + *

+ * A non-null lock object indicates that this provider has previously + * **successfully** acquired a lock via + * StorageBasedLockProvider##lock and has not yet **successfully** released + * it via StorageBasedLockProvider#unlock(). + * It is merely an indicator that the lock might be held by this provider. To + * truly certify we are the owner of the lock, + * StorageBasedLockProvider#actuallyHoldsLock should be used. + * + * @return {@code true} if this provider has a non-null lock object, + * {@code false} otherwise + * @see StorageBasedLockProvider#actuallyHoldsLock() + */ + private boolean believesLockMightBeHeld() { + return this.getLock() != null; + } + + /** + * Unlock by marking our current lock file "expired": true. + */ + @Override + public synchronized void unlock() { + assertHeartbeatManagerExists(); + if (!believesLockMightBeHeld()) { + return; + } + boolean believesNoLongerHoldsLock = true; + + // Try to stop the heartbeat first + if (heartbeatManager.hasActiveHeartbeat()) { + logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId); + believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true); + } + + // Then expire the current lock. + believesNoLongerHoldsLock &= tryExpireCurrentLock(false); + if (!believesNoLongerHoldsLock) { + throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE)); + } + } + + private void assertHeartbeatManagerExists() { + if (heartbeatManager == null) { + // broken function precondition. + throw new HoodieLockException("Unexpected null heartbeatManager"); + } + } + + private void assertUnclosed() { + if (this.isClosed) { + throw new HoodieLockException("Lock provider already closed"); + } + } + + /** + * Tries to expire the currently held lock. + * @param fromShutdownHook Whether we are attempting best effort quick unlock from shutdown hook. + * @return True if we were successfully able to upload an expired lock. + */ + private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) { + // It does not make sense to have heartbeat alive extending the lock lease while + // here we are trying + // to expire the lock. + if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) { + // broken function precondition. + throw new HoodieLockException("Must stop heartbeat before expire lock file"); + } + logDebugLockState(RELEASING); + // Upload metadata that will unlock this lock. + StorageLockData expiredLockData = new StorageLockData(true, this.getLock().getValidUntilMs(), ownerId); + Pair> result; + result = this.storageLockClient.tryUpsertLockFile(expiredLockData, Option.of(this.getLock())); + switch (result.getLeft()) { + case UNKNOWN_ERROR: + // Here we do not know the state of the lock. + logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown."); + return false; + case SUCCESS: + logInfoLockState(RELEASED); + setLock(null); + return true; + case ACQUIRED_BY_OTHERS: + // As we are confident no lock is held by itself, clean up the cached lock object. + // However, this is an edge case, so warn. + logWarnLockState(RELEASED, "lock should not have been acquired by others."); + setLock(null); + return true; + default: + throw new HoodieLockException("Unexpected lock update result: " + result.getLeft()); + } + } + + /** + * Renews (heartbeats) the current lock if we are the holder, it forcefully set + * the expiration flag + * to false and the lock expiration time to a later time in the future. + * @return True if we successfully renewed the lock, false if not. + */ + @VisibleForTesting + protected synchronized boolean renewLock() { + try { + // If we don't hold the lock, no-op. + if (!believesLockMightBeHeld()) { + logger.warn("Owner {}: Cannot renew, no lock held by this process", ownerId); + // No need to extend lock lease. + return false; + } + + long oldExpirationMs = getLock().getValidUntilMs(); + // Attempt conditional update, extend lock. There are 3 cases: + // 1. Happy case: lock has not expired yet, we extend the lease to a longer + // period. + // 2. Corner case 1: lock is expired and is acquired by others, lock renewal + // failed with ACQUIRED_BY_OTHERS. + // 3. Corner case 2: lock is expired but no one has acquired it yet, lock + // renewal "revived" the expired lock. + // Please note we expect the corner cases almost never happens. + // Action taken for corner case 2 is just a best effort mitigation. At least it + // prevents further data corruption by + // letting someone else acquire the lock. + Pair> currentLock = this.storageLockClient.tryUpsertLockFile( + new StorageLockData(false, getCurrentEpochMs() + TimeUnit.SECONDS.toMillis(lockValiditySecs), ownerId), + Option.of(getLock())); + switch (currentLock.getLeft()) { + case ACQUIRED_BY_OTHERS: + logger.error("Owner {}: Unable to renew lock as it is acquired by others.", ownerId); + // No need to extend lock lease anymore. + return false; + case UNKNOWN_ERROR: + // This could be transient, but unclear, we will let the heartbeat continue + // normally. + // If the next heartbeat run identifies our lock has expired we will error out. + logger.warn("Owner {}: Unable to renew lock due to unknown error, could be transient.", ownerId); + // Let heartbeat retry later. + return true; + case SUCCESS: + // Only positive outcome + this.setLock(currentLock.getRight().get()); + logger.info("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}.", + ownerId, oldExpirationMs - getCurrentEpochMs(), lockFilePath); + // Let heartbeat continue to renew lock lease again later. + return true; + default: + throw new HoodieLockException("Unexpected lock update result: " + currentLock.getLeft()); + } + } catch (Exception e) { + logger.error("Owner {}: Exception occurred while renewing lock", ownerId, e); + return false; + } + } + + // --------- + // Utilities + // --------- + + /** + * Method to calculate whether a timestamp from a distributed source has + * definitively occurred yet. + */ + protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epochMs) { + return getCurrentEpochMs() > epochMs + CLOCK_DRIFT_BUFFER_MS; + } + + private String generateLockStateMessage(LockState state) { + String threadName = Thread.currentThread().getName(); + return String.format( + "Owner %s: Lock file path %s, Thread %s, Storage based lock state %s", + ownerId, + lockFilePath, + threadName, + state.toString()); + } + + private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}"; + private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}, {}"; + + private void logDebugLockState(LockState state) { + logger.debug(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, Thread.currentThread(), state); + } + + private void logInfoLockState(LockState state) { + logger.info(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, Thread.currentThread(), state); + } + + private void logInfoLockState(LockState state, String msg) { + logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg); + } + + private void logWarnLockState(LockState state, String msg) { + logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg); + } + + private void logErrorLockState(LockState state, String msg) { + logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg); + } + + @VisibleForTesting + long getCurrentEpochMs() { + return System.currentTimeMillis(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java index be73bd7cb2c0e..a91193d5dc05f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java @@ -167,7 +167,7 @@ public LockProviderHeartbeatManager(String ownerId, */ private static ScheduledExecutorService createThreadScheduler(String shortUuid) { return Executors.newSingleThreadScheduledExecutor( - r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + shortUuid)); + r -> new Thread(r, "LockProvider-HeartbeatManager-Thread-" + shortUuid)); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java new file mode 100644 index 0000000000000..8f4d4779ad1ce --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; + +public class StorageBasedLockConfig extends HoodieConfig { + private static final String SINCE_VERSION_1_0_2 = "1.0.2"; + private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + + "storage."; + + public static final ConfigProperty VALIDITY_TIMEOUT_SECONDS = ConfigProperty + .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs") + .defaultValue(TimeUnit.MINUTES.toSeconds(5)) + .markAdvanced() + .sinceVersion(SINCE_VERSION_1_0_2) + .withDocumentation( + "For storage-based lock provider, the amount of time in seconds each new lock is valid for. " + + "The lock provider will attempt to renew its lock until it successfully extends the lock lease period " + + "or the validity timeout is reached."); + + public static final ConfigProperty HEARTBEAT_POLL_SECONDS = ConfigProperty + .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs") + .defaultValue(30L) + .markAdvanced() + .sinceVersion(SINCE_VERSION_1_0_2) + .withDocumentation( + "For storage-based lock provider, the amount of time in seconds to wait before renewing the lock. " + + "Defaults to 30 seconds."); + + public long getValiditySeconds() { + return getLong(VALIDITY_TIMEOUT_SECONDS); + } + + public long getHeartbeatPollSeconds() { + return getLong(HEARTBEAT_POLL_SECONDS); + } + + public String getHudiTableBasePath() { + return getString(BASE_PATH); + } + + public static class Builder { + private final StorageBasedLockConfig lockConfig = new StorageBasedLockConfig(); + + public StorageBasedLockConfig build() { + lockConfig.setDefaults(StorageBasedLockConfig.class.getName()); + return lockConfig; + } + + public StorageBasedLockConfig.Builder fromProperties(TypedProperties props) { + lockConfig.getProps().putAll(props); + checkRequiredProps(); + return this; + } + + private void checkRequiredProps() { + String notExistsMsg = " does not exist!"; + if (!lockConfig.contains(BASE_PATH)) { + throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg); + } + if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) + * 10) { + throw new IllegalArgumentException( + VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal to 10x " + HEARTBEAT_POLL_SECONDS.key()); + } + if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 10) { + throw new IllegalArgumentException( + VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal to 10 seconds."); + } + if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) { + throw new IllegalArgumentException( + HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal to 1 second."); + } + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java new file mode 100644 index 0000000000000..4e95c33371f8f --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java @@ -0,0 +1,669 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.client.transaction.lock.models.HeartbeatManager; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.storage.StorageConfiguration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; + +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.refEq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit test class for StorageBasedLockProvider + */ +class TestStorageBasedLockProvider { + private StorageBasedLockProvider lockProvider; + private StorageLockClient mockLockService; + private HeartbeatManager mockHeartbeatManager; + private Logger mockLogger; + private final String ownerId = UUID.randomUUID().toString(); + private static final int DEFAULT_LOCK_VALIDITY_MS = 10000; + + @BeforeEach + void setupLockProvider() { + mockLockService = mock(StorageLockClient.class); + mockHeartbeatManager = mock(HeartbeatManager.class); + mockLogger = mock(Logger.class); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + TypedProperties props = new TypedProperties(); + props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10"); + props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1"); + props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default"); + + lockProvider = spy(new StorageBasedLockProvider( + ownerId, + props, + (a,b,c) -> mockHeartbeatManager, + (a,b,c) -> mockLockService, + mockLogger)); + } + + @AfterEach + void cleanupLockProvider() { + lockProvider.close(); + } + + @Test + void testValidLockStorageLocation() { + TypedProperties props = new TypedProperties(); + props.put(BASE_PATH.key(), "s3://bucket/lake/db/tbl-default"); + + LockConfiguration lockConf = new LockConfiguration(props); + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + + HoodieLockException ex = assertThrows(HoodieLockException.class, + () -> new StorageBasedLockProvider(lockConf, storageConf)); + assertTrue(ex.getMessage().contains("Failed to load and initialize StorageLock")); + } + + @ParameterizedTest + @ValueSource(strings = { "gs://bucket/lake/db/tbl-default", "s3://bucket/lake/db/tbl-default", + "s3a://bucket/lake/db/tbl-default" }) + void testNonExistentWriteServiceWithDefaults(String tableBasePathString) { + TypedProperties props = new TypedProperties(); + props.put(BASE_PATH.key(), tableBasePathString); + + LockConfiguration lockConf = new LockConfiguration(props); + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + + HoodieLockException ex = assertThrows(HoodieLockException.class, + () -> new StorageBasedLockProvider(lockConf, storageConf)); + assertTrue(ex.getMessage().contains("Failed to load and initialize StorageLock")); + } + + @Test + void testTryLockForTimeUnitThrowsOnInterrupt() throws Exception { + doReturn(false).when(lockProvider).tryLock(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + lockProvider.tryLock(1, TimeUnit.SECONDS); + } catch (HoodieLockException e) { + latch.countDown(); + } + }); + t.start(); + Thread.sleep(50); + t.interrupt(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + void testTryLockForTimeUnitAcquiresLockEventually() throws Exception { + AtomicInteger count = new AtomicInteger(0); + doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + assertTrue(lockProvider.tryLock(4, TimeUnit.SECONDS)); + latch.countDown(); + }); + t.start(); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception { + AtomicInteger count = new AtomicInteger(0); + doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + assertFalse(lockProvider.tryLock(1, TimeUnit.SECONDS)); + latch.countDown(); + }); + t.start(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + void testTryLockSuccess() { + long t0 = 1_000L; + when(lockProvider.getCurrentEpochMs()) + .thenReturn(t0); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, t0 + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(refEq(data), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + assertEquals(realLockFile, lockProvider.getLock()); + verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any()); + } + + @Test + void testTryLockSuccessButFailureToStartHeartbeat() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile)))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + + boolean acquired = lockProvider.tryLock(); + assertFalse(acquired); + } + + @Test + void testTryLockFailsFromOwnerMismatch() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockFile returnedLockFile = new StorageLockFile( + new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(returnedLockFile))); + + HoodieLockException ex = assertThrows(HoodieLockException.class, () -> lockProvider.tryLock()); + assertTrue(ex.getMessage().contains("Owners do not match")); + } + + @Test + void testTryLockFailsDueToExistingLock() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, + "other-owner"); + StorageLockFile existingLock = new StorageLockFile(data, "v2"); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS, Option.of(existingLock))); + + boolean acquired = lockProvider.tryLock(); + assertFalse(acquired); + } + + @Test + void testTryLockFailsToUpdateFile() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, Option.empty())); + assertFalse(lockProvider.tryLock()); + } + + @Test + void testTryLockFailsDueToUnknownState() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR, Option.empty())); + assertFalse(lockProvider.tryLock()); + } + + @Test + void testTryLockSucceedsWhenExistingLockExpiredByTime() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, + "other-owner"); + StorageLockFile existingLock = new StorageLockFile(data, "v2"); + StorageLockData newData = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile realLockFile = new StorageLockFile(newData, "v1"); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS, Option.of(existingLock))); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(existingLock)))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + } + + @Test + void testTryLockReentrancySucceeds() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + // Re-entrancy succeeds + assertTrue(lockProvider.tryLock()); + } + + @Test + void testTryLockReentrancyAfterLockExpiredByTime() { + // In an extremely unlikely scenario, we could have a local reference to a lock + // which is present but expired, + // and because we were unable to stop the heartbeat properly, we did not + // successfully set it to null. + // Due to the nature of the heartbeat manager, this is expected to introduce + // some delay, but not be permanently blocking. + // There are a few variations of this edge case, so we must test them all. + + // Here the lock is still "unexpired" but the time shows expired. + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile expiredLock = new StorageLockFile(data, "v1"); + doReturn(expiredLock).when(lockProvider).getLock(); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile validLock = new StorageLockFile(validData, "v2"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + assertTrue(lockProvider.tryLock()); + } + + @Test + void testTryLockReentrancyAfterLockSetExpired() { + // In an extremely unlikely scenario, we could have a local reference to a lock + // which is present but expired, + // and because we were unable to stop the heartbeat properly, we did not + // successfully set it to null. + // Due to the nature of the heartbeat manager, this is expected to introduce + // some delay, but not be permanently blocking. + // There are a few variations of this edge case, so we must test them all. + + // Here the lock is "expired" but the time shows unexpired. + StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile expiredLock = new StorageLockFile(data, "v1"); + doReturn(expiredLock).when(lockProvider).getLock(); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile validLock = new StorageLockFile(validData, "v2"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(validLock))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + assertTrue(lockProvider.tryLock()); + } + + @Test + void testTryLockHeartbeatStillActive() { + // In an extremely unlikely scenario, we could have a local reference to a lock + // which is present but expired, + // and because we were unable to stop the heartbeat properly, we did not + // successfully set it to null. + // Due to the nature of the heartbeat manager, this is expected to introduce + // some delay, but not be permanently blocking. + // There are a few variations of this edge case, so we must test them all. + + // Here the heartbeat is still active, so we have to error out. + StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile expiredLock = new StorageLockFile(data, "v1"); + doReturn(expiredLock).when(lockProvider).getLock(); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + assertThrows(HoodieLockException.class, () -> lockProvider.tryLock()); + } + + @Test + void testUnlockSucceedsAndReentrancy() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(realLockFile)))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, + Option.of(new StorageLockFile(new StorageLockData(true, data.getValidUntil(), ownerId), "v2")))); + assertTrue(lockProvider.tryLock()); + when(mockHeartbeatManager.hasActiveHeartbeat()) + .thenReturn(true) // when we try to stop the heartbeat we will check if heartbeat is active return true. + .thenReturn(false); // when try to set lock to expire we will assert no active heartbeat as a precondition. + lockProvider.unlock(); + assertNull(lockProvider.getLock()); + lockProvider.unlock(); + } + + @Test + void testUnlockFailsToStopHeartbeat() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + assertThrows(HoodieLockException.class, () -> lockProvider.unlock()); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + } + + @Test + void testCloseFailsToStopHeartbeat() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + // Should wrap the exception and log error. + lockProvider.close(); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + } + + @Test + void testRenewLockReturnsFalseWhenNoLockHeld() { + doReturn(null).when(lockProvider).getLock(); + assertFalse(lockProvider.renewLock()); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this process", this.ownerId); + } + + @Test + void testRenewLockWithoutHoldingLock() { + doReturn(null).when(lockProvider).getLock(); + assertFalse(lockProvider.renewLock()); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this process", this.ownerId); + } + + @Test + void testRenewLockWithFullyExpiredLock() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile nearExpiredLockFile = new StorageLockFile(data, "v1"); + doReturn(nearExpiredLockFile).when(lockProvider).getLock(); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(nearExpiredLockFile)))) + .thenReturn(Pair.of(LockUpsertResult.ACQUIRED_BY_OTHERS, null)); + assertFalse(lockProvider.renewLock()); + verify(mockLogger).error("Owner {}: Unable to renew lock as it is acquired by others.", this.ownerId); + } + + @Test + void testRenewLockUnableToUpsertLockFileButNotFatal() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + // Signal the upsert attempt failed, but may be transient. See interface for + // more details. + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile)))) + .thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty())); + assertTrue(lockProvider.renewLock()); + } + + @Test + void testRenewLockUnableToUpsertLockFileFatal() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + // Signal the upsert attempt failed, but may be transient. See interface for + // more details. + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile)))) + .thenReturn(Pair.of(LockUpsertResult.UNKNOWN_ERROR, Option.empty())); + // renewLock return true so it will be retried. + assertTrue(lockProvider.renewLock()); + + verify(mockLogger).warn("Owner {}: Unable to renew lock due to unknown error, could be transient.", this.ownerId); + } + + @Test + void testRenewLockSucceedsButRenewalWithinExpirationWindow() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + + StorageLockData nearExpirationData = new StorageLockData(false, System.currentTimeMillis(), ownerId); + StorageLockFile lockFileNearExpiration = new StorageLockFile(nearExpirationData, "v2"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile)))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(lockFileNearExpiration))); + + // We used to fail in this case before, but since we are only modifying a single + // lock file, this is ok now. + // Therefore, this can be a happy path variation. + assertTrue(lockProvider.renewLock()); + } + + @Test + void testRenewLockSucceeds() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + + StorageLockData successData = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile successLockFile = new StorageLockFile(successData, "v2"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile)))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(successLockFile))); + assertTrue(lockProvider.renewLock()); + + verify(mockLogger).info( + eq("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}."), + eq(this.ownerId), anyLong(), eq("gs://bucket/lake/db/tbl-default/.hoodie/.locks/table_lock.json")); + } + + @Test + void testRenewLockFails() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + + when(mockLockService.tryUpsertLockFile(any(), eq(Option.of(lockFile)))) + .thenThrow(new RuntimeException("Failure")); + assertFalse(lockProvider.renewLock()); + + verify(mockLogger).error(eq("Owner {}: Exception occurred while renewing lock"), eq(ownerId), + any(RuntimeException.class)); + } + + @Test + void testCloseCallsDependencies() throws Exception { + lockProvider.close(); + verify(mockLockService, atLeastOnce()).close(); + verify(mockHeartbeatManager, atLeastOnce()).close(); + assertNull(lockProvider.getLock()); + } + + @Test + void testCloseWithErrorForLockService() throws Exception { + doThrow(new RuntimeException("Some failure")).when(mockLockService).close(); + lockProvider.close(); + verify(mockLogger).error(eq("Owner {}: Lock service failed to close."), eq(ownerId), any(RuntimeException.class)); + assertNull(lockProvider.getLock()); + } + + @Test + void testCloseWithErrorForHeartbeatManager() throws Exception { + doThrow(new RuntimeException("Some failure")).when(mockHeartbeatManager).close(); + lockProvider.close(); + verify(mockLogger).error(eq("Owner {}: Heartbeat manager failed to close."), eq(ownerId), + any(RuntimeException.class)); + assertNull(lockProvider.getLock()); + } + + @Test + public void testShutdownHookViaReflection() throws Exception { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, Option.empty())); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryUpsertLockFile(any(), eq(Option.empty()))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + assertEquals(realLockFile, lockProvider.getLock()); + verify(mockLockService, atLeastOnce()).tryUpsertLockFile(any(), any()); + + when(mockLockService.tryUpsertLockFile(any(StorageLockData.class), eq(Option.of(realLockFile)))) + .thenReturn(Pair.of(LockUpsertResult.SUCCESS, Option.of(realLockFile))); + + // Mock shutdown + lockProvider.shutdown(true); + + // Verify that the expected shutdown behaviors occurred. + assertNull(lockProvider.getLock()); + // We do not execute additional actions + verify(mockLockService, never()).close(); + verify(mockHeartbeatManager, never()).close(); + } + + @Test + public void testShutdownHookWhenNoLockPresent() throws Exception { + // Now, when calling shutdown(true), the method should immediately return. + lockProvider.shutdown(true); + + // Verify that unlock or close methods are NOT invoked, or adjust expectations accordingly. + verify(mockLockService, never()).close(); + verify(mockHeartbeatManager, never()).close(); + } + + @Test + public void testShutdownHookFailsToBeRemoved() throws Exception { + doThrow(new IllegalStateException("Shutdown already in progress")).when(lockProvider).tryRemoveShutdownHook(); + lockProvider.close(); + verify(mockLockService, atLeastOnce()).close(); + verify(mockHeartbeatManager, atLeastOnce()).close(); + assertNull(lockProvider.getLock()); + } + + @Test + void testShutdownHookFiresDuringTryLockWithTimeout() throws Exception { + // This test simulates the scenario where the shutdown hook fires while tryLock(long time, TimeUnit unit) + // is in progress, and expects that HoodieLockException is thrown when tryLock is called after shutdown. + + // Setup mocks to simulate lock being held by another owner (to keep tryLock looping) + StorageLockData otherOwnerData = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "other-owner"); + StorageLockFile otherOwnerLock = new StorageLockFile(otherOwnerData, "v1"); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS, Option.of(otherOwnerLock))); + + CountDownLatch tryLockStarted = new CountDownLatch(1); + CountDownLatch proceedWithShutdown = new CountDownLatch(1); + CountDownLatch shutdownCompleted = new CountDownLatch(1); + CountDownLatch tryLockCompleted = new CountDownLatch(1); + CountDownLatch exceptionThrown = new CountDownLatch(1); + + // Spy on the real tryLock to know when it's been called and coordinate with shutdown + AtomicInteger tryLockCallCount = new AtomicInteger(0); + doAnswer(inv -> { + int count = tryLockCallCount.incrementAndGet(); + if (count == 1) { + // First call - signal that tryLock has started + tryLockStarted.countDown(); + // Wait for shutdown to be triggered + assertTrue(proceedWithShutdown.await(2, TimeUnit.SECONDS)); + } else { + // Subsequent calls - wait briefly for shutdown to complete + assertTrue(shutdownCompleted.await(100, TimeUnit.MILLISECONDS)); + } + // Call the real method + return inv.callRealMethod(); + }).when(lockProvider).tryLock(); + + // Start a thread that will call tryLock with timeout + Thread tryLockThread = new Thread(() -> { + try { + lockProvider.tryLock(2, TimeUnit.SECONDS); + // Should not reach here - exception should be thrown after shutdown + fail("Should have thrown HoodieLockException after shutdown"); + } catch (HoodieLockException e) { + // Expected - tryLock should throw exception after shutdown + exceptionThrown.countDown(); + } finally { + tryLockCompleted.countDown(); + } + }); + + tryLockThread.start(); + + // Wait for tryLock to start + assertTrue(tryLockStarted.await(2, TimeUnit.SECONDS), "tryLock should have started"); + + // Now invoke the shutdown hook while tryLock is in progress + // Invoke shutdown in a separate thread to simulate shutdown hook + Thread shutdownThread = new Thread(() -> { + proceedWithShutdown.countDown(); // Signal tryLock to proceed + lockProvider.shutdown(true); + shutdownCompleted.countDown(); // Signal that shutdown is complete + }); + shutdownThread.start(); + + // Wait for both operations to complete + assertTrue(tryLockCompleted.await(5, TimeUnit.SECONDS), "tryLock should complete"); + assertTrue(exceptionThrown.await(1, TimeUnit.SECONDS), "HoodieLockException should have been thrown"); + shutdownThread.join(2000); + + // Verify the state after shutdown + // The lock should be null after shutdown + assertNull(lockProvider.getLock(), "Lock should be null after shutdown hook fires"); + + // Verify that tryLock was called at least once + verify(lockProvider, atLeastOnce()).tryLock(); + } + + public static class StubStorageLockClient implements StorageLockClient { + public StubStorageLockClient(String ownerId, String lockFileUri, Properties props) { + assertTrue(lockFileUri.endsWith("table_lock.json")); + } + + @Override + public Pair> tryUpsertLockFile( + StorageLockData newLockData, + Option previousLockFile) { + return null; + } + + @Override + public Pair> readCurrentLockFile() { + return null; + } + + @Override + public void close() throws Exception { + // stub, no-op + } + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java index cdeca536b3ae3..234aebe70b762 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/TestLockProviderHeartbeatManager.java @@ -156,13 +156,13 @@ void testHeartbeatUnableToAcquireSemaphore() throws InterruptedException { when(semaphore.tryAcquire()).thenReturn(false); when(semaphore.tryAcquire(eq(DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS), eq(TimeUnit.MILLISECONDS))).thenReturn(true); manager = new LockProviderHeartbeatManager( - LOGGER_ID, - mockScheduler, - 100L, - DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, - () -> true, - semaphore, - mockLogger); + LOGGER_ID, + mockScheduler, + 100L, + DEFAULT_STOP_HEARTBEAT_TIMEOUT_MS, + () -> true, + semaphore, + mockLogger); assertTrue(manager.startHeartbeatForThread(Thread.currentThread())); t.get().start(); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java new file mode 100644 index 0000000000000..4855b65058772 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.TypedProperties; + +import org.junit.jupiter.api.Test; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestStorageBasedLockConfig { + + @Test + void testDefaultValues() { + // Asserts that the defaults are correct + TypedProperties props = new TypedProperties(); + props.setProperty(BASE_PATH.key(), "s3://hudi-bucket/table/basepath"); + + StorageBasedLockConfig.Builder builder = new StorageBasedLockConfig.Builder(); + StorageBasedLockConfig config = builder + .fromProperties(props) + .build(); + + assertEquals(5 * 60, config.getValiditySeconds(), "Default lock validity should be 5 minutes"); + assertEquals(30, config.getHeartbeatPollSeconds(), "Default heartbeat poll time should be 30 seconds"); + } + + @Test + void testCustomValues() { + // Testing that custom values which differ from defaults can be read properly + TypedProperties props = new TypedProperties(); + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "120"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "10"); + props.setProperty(BASE_PATH.key(), "/hudi/table/basepath"); + + StorageBasedLockConfig config = new StorageBasedLockConfig.Builder() + .fromProperties(props) + .build(); + + assertEquals(120, config.getValiditySeconds()); + assertEquals(10, config.getHeartbeatPollSeconds()); + assertEquals("/hudi/table/basepath", config.getHudiTableBasePath()); + } + + @Test + void testBasePathPropertiesValidation() { + // Tests that validations around the base path are present. + TypedProperties props = new TypedProperties(); + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "120"); + StorageBasedLockConfig.Builder propsBuilder = new StorageBasedLockConfig.Builder(); + + // Missing base path + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(BASE_PATH.key())); + } + + @Test + void testTimeThresholds() { + // Ensure that validations which restrict the time-based inputs are working. + TypedProperties props = new TypedProperties(); + props.setProperty(BASE_PATH.key(), "/hudi/table/basepath"); + // Invalid config case: validity timeout is less than 10x of heartbeat poll period + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "3"); + StorageBasedLockConfig.Builder propsBuilder = new StorageBasedLockConfig.Builder(); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key())); + // Invalid config case: validity timeout is less than 10 seconds + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "9"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1"); + exception = assertThrows(IllegalArgumentException.class, () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key())); + // Invalid config case: heartbeat poll period is less than 1 second + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "10"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "0"); + exception = assertThrows(IllegalArgumentException.class, () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key())); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 2c3e2b515bbbc..3102e5e1e53f7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -97,6 +97,8 @@ public class HoodieTableMetaClient implements Serializable { public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".fileids"; + public static final String LOCKS_FOLDER_NAME = METAFOLDER_NAME + StoragePath.SEPARATOR + ".locks"; + public static final String SCHEMA_FOLDER_NAME = ".schema"; public static final String MARKER_EXTN = ".marker"; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java index 1b1d32e4ac37e..aa4932c0cc6c8 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java @@ -84,6 +84,15 @@ public void testStorageSchemes() { assertThrows(IllegalArgumentException.class, () -> { StorageSchemes.isAppendSupported("s2"); }, "Should throw exception for unsupported schemes"); + + for (StorageSchemes scheme : StorageSchemes.values()) { + String schemeName = scheme.getScheme(); + if (scheme.getScheme().startsWith("s3") || scheme.getScheme().startsWith("gs")) { + assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent()); + } else { + assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent()); + } + } } @Test diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java index 129956166b3ac..68c0a068188ed 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java @@ -21,68 +21,71 @@ import java.util.Arrays; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; + /** * All the supported storage schemes in Hoodie. */ public enum StorageSchemes { // Local filesystem - FILE("file", false, false, true, true), + FILE("file", false, false, true, true, null), // Hadoop File System - HDFS("hdfs", true, false, true, false), + HDFS("hdfs", true, false, true, false, null), // Baidu Advanced File System - AFS("afs", true, null, null, null), + AFS("afs", true, null, null, null, null), // Mapr File System - MAPRFS("maprfs", true, null, null, null), + MAPRFS("maprfs", true, null, null, null, null), // Apache Ignite FS - IGNITE("igfs", true, null, null, null), + IGNITE("igfs", true, null, null, null, null), // AWS S3 - S3A("s3a", false, true, null, true), - S3("s3", false, true, null, true), + S3A("s3a", false, true, null, true, "org.apache.hudi.aws.transaction.lock.S3StorageLockClient"), + S3("s3", false, true, null, true, "org.apache.hudi.aws.transaction.lock.S3StorageLockClient"), // Google Cloud Storage - GCS("gs", false, true, null, true), + GCS("gs", false, true, null, true, "org.apache.hudi.gcp.transaction.lock.GCSStorageLockClient"), // Azure WASB - WASB("wasb", false, null, null, null), WASBS("wasbs", false, null, null, null), + WASB("wasb", false, null, null, null, null), WASBS("wasbs", false, null, null, null, null), // Azure ADLS - ADL("adl", false, null, null, null), + ADL("adl", false, null, null, null, null), // Azure ADLS Gen2 - ABFS("abfs", false, null, null, null), ABFSS("abfss", false, null, null, null), + ABFS("abfs", false, null, null, null, null), ABFSS("abfss", false, null, null, null, null), // Aliyun OSS - OSS("oss", false, null, null, null), + OSS("oss", false, null, null, null, null), // View FS for federated setups. If federating across cloud stores, then append support is false // View FS support atomic creation - VIEWFS("viewfs", true, null, true, null), + VIEWFS("viewfs", true, null, true, null, null), //ALLUXIO - ALLUXIO("alluxio", false, null, null, null), + ALLUXIO("alluxio", false, null, null, null, null), // Tencent Cloud Object Storage - COSN("cosn", false, null, null, null), + COSN("cosn", false, null, null, null, null), // Tencent Cloud HDFS - CHDFS("ofs", true, null, null, null), + CHDFS("ofs", true, null, null, null, null), // Tencent Cloud CacheFileSystem - GOOSEFS("gfs", false, null, null, null), + GOOSEFS("gfs", false, null, null, null, null), // Databricks file system - DBFS("dbfs", false, null, null, null), + DBFS("dbfs", false, null, null, null, null), // IBM Cloud Object Storage - COS("cos", false, null, null, null), + COS("cos", false, null, null, null, null), // Huawei Cloud Object Storage - OBS("obs", false, null, null, null), + OBS("obs", false, null, null, null, null), // Kingsoft Standard Storage ks3 - KS3("ks3", false, null, null, null), + KS3("ks3", false, null, null, null, null), // Netease Object Storage nos - NOS("nos", false, null, null, null), + NOS("nos", false, null, null, null, null), // JuiceFileSystem - JFS("jfs", true, null, null, null), + JFS("jfs", true, null, null, null, null), // Baidu Object Storage - BOS("bos", false, null, null, null), + BOS("bos", false, null, null, null, null), // Oracle Cloud Infrastructure Object Storage - OCI("oci", false, null, null, null), + OCI("oci", false, null, null, null, null), // Volcengine Object Storage - TOS("tos", false, null, null, null), + TOS("tos", false, null, null, null, null), // Volcengine Cloud HDFS - CFS("cfs", true, null, null, null), + CFS("cfs", true, null, null, null, null), // Aliyun Apsara File Storage for HDFS - DFS("dfs", true, false, true, null), + DFS("dfs", true, false, true, null, null), // Hopsworks File System - HOPSFS("hopsfs", false, false, true, null); + HOPSFS("hopsfs", false, false, true, null, null); private String scheme; private boolean supportsAppend; @@ -94,13 +97,15 @@ public enum StorageSchemes { // when we want to get only part of files under a directory rather than all files, use getStatus may be more friendly than listStatus. // here is a trade-off between rpc times and throughput of storage meta service private Boolean listStatusFriendly; + private String storageLockClass; - StorageSchemes(String scheme, boolean supportsAppend, Boolean isWriteTransactional, Boolean supportAtomicCreation, Boolean listStatusFriendly) { + StorageSchemes(String scheme, boolean supportsAppend, Boolean isWriteTransactional, Boolean supportAtomicCreation, Boolean listStatusFriendly, String storageLockClass) { this.scheme = scheme; this.supportsAppend = supportsAppend; this.isWriteTransactional = isWriteTransactional; this.supportAtomicCreation = supportAtomicCreation; this.listStatusFriendly = listStatusFriendly; + this.storageLockClass = storageLockClass; } public String getScheme() { @@ -123,6 +128,14 @@ public boolean getListStatusFriendly() { return listStatusFriendly != null && listStatusFriendly; } + public boolean implementsStorageLock() { + return !StringUtils.isNullOrEmpty(storageLockClass); + } + + public String getStorageLockClass() { + return storageLockClass; + } + public static boolean isSchemeSupported(String scheme) { return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme)); } @@ -155,4 +168,12 @@ public static boolean isListStatusFriendly(String scheme) { } return Arrays.stream(StorageSchemes.values()).anyMatch(s -> s.getListStatusFriendly() && s.scheme.equals(scheme)); } + + public static Option getStorageLockImplementationIfExists(String scheme) { + if (!isSchemeSupported(scheme)) { + throw new IllegalArgumentException("Unsupported scheme :" + scheme); + } + return Option.fromJavaOptional(Arrays.stream(StorageSchemes.values()) + .filter(s -> s.implementsStorageLock() && s.scheme.equals(scheme)).findFirst()); + } }