diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java new file mode 100644 index 0000000000000..ba9dfb9a3fad7 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageBasedLockProvider.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.HeartbeatManager; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockProviderHeartbeatManager; +import org.apache.hudi.client.transaction.lock.models.LockUpdateResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.util.Functions; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StorageSchemes; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static org.apache.hudi.common.lock.LockState.ACQUIRED; +import static org.apache.hudi.common.lock.LockState.ACQUIRING; +import static org.apache.hudi.common.lock.LockState.FAILED_TO_ACQUIRE; +import static org.apache.hudi.common.lock.LockState.FAILED_TO_RELEASE; +import static org.apache.hudi.common.lock.LockState.RELEASED; +import static org.apache.hudi.common.lock.LockState.RELEASING; +import static org.apache.hudi.common.table.HoodieTableMetaClient.LOCKS_FOLDER_NAME; + +/** + * A distributed filesystem storage based lock provider. This {@link LockProvider} implementation + * leverages conditional writes to ensure transactional consistency for multi-writer scenarios. + * The underlying storage client interface {@link StorageLock} is pluggable so it can be implemented for any + * filesystem which supports conditional writes. + */ +@ThreadSafe +public class StorageBasedLockProvider implements LockProvider { + + public static final String DEFAULT_TABLE_LOCK_FILE_NAME = "table_lock"; + // How long to wait before retrying lock acquisition in blocking calls. + private static final long DEFAULT_LOCK_ACQUISITION_BUFFER_MS = 1000; + // Maximum expected clock drift between two nodes. + // This is similar idea as SkewAdjustingTimeGenerator. + // In reality, within a single cloud provider all nodes share the same ntp + // server + // therefore we do not expect drift more than a few ms. + // However, since our lock leases are pretty long, we can use a high buffer. + private static final long CLOCK_DRIFT_BUFFER_MS = 500; + + // When we retry lock upserts, do so 5 times + private static final long LOCK_UPSERT_RETRY_COUNT = 5; + + private static final Logger LOGGER = LoggerFactory.getLogger(StorageBasedLockProvider.class); + + // Use for testing + private final Logger logger; + + // The lock service implementation which interacts with storage + private final StorageLock lockService; + + private final long validitySeconds; + private final String ownerId; + private final String lockFilePath; + private final HeartbeatManager heartbeatManager; + private final transient Thread shutdownThread; + + @GuardedBy("this") + private StorageLockFile currentLockObj = null; + @GuardedBy("this") + private boolean isClosed = false; + + private synchronized void setLock(StorageLockFile lockObj) { + if (lockObj != null && !Objects.equals(lockObj.getOwner(), this.ownerId)) { + throw new HoodieLockException("Owners do not match. Current lock owner: " + this.ownerId + " lock path: " + + this.lockFilePath + " owner: " + lockObj.getOwner()); + } + this.currentLockObj = lockObj; + } + + /** + * Default constructor for StorageBasedLockProvider, required by LockManager + * to instantiate it using reflection. + * + * @param lockConfiguration The lock configuration, should be transformable into + * StorageBasedLockConfig + * @param conf Storage config, ignored. + */ + public StorageBasedLockProvider(final LockConfiguration lockConfiguration, final StorageConfiguration conf) { + this( + UUID.randomUUID().toString(), + lockConfiguration.getConfig(), + LockProviderHeartbeatManager::new, + getLockService(), + LOGGER); + } + + private static Functions.Function3 getLockService() { + return (ownerId, lockFilePath, lockConfig) -> { + try { + return (StorageLock) ReflectionUtils.loadClass( + getLockServiceClassName(new URI(lockFilePath).getScheme()), + new Class[]{String.class, String.class, String.class, Properties.class}, + new Object[]{ownerId, lockFilePath, lockConfig}); + } catch (Throwable e) { + throw new HoodieLockException("Failed to load and initialize StorageLock", e); + } + }; + } + + private static @NotNull String getLockServiceClassName(String scheme) { + Option schemeOptional = StorageSchemes.getStorageLockImplementationIfExists(scheme); + if (schemeOptional.isPresent()) { + return schemeOptional.get().getStorageLockClass(); + } else { + throw new HoodieNotSupportedException("No implementation of StorageLock supports this scheme: " + scheme); + } + } + + @VisibleForTesting + StorageBasedLockProvider( + String ownerId, + TypedProperties properties, + Functions.Function3, HeartbeatManager> heartbeatManagerLoader, + Functions.Function3 lockServiceLoader, + Logger logger) { + StorageBasedLockConfig config = new StorageBasedLockConfig.Builder().fromProperties(properties).build(); + long heartbeatPollSeconds = config.getHeartbeatPollSeconds(); + this.validitySeconds = config.getValiditySeconds(); + this.lockFilePath = String.format("%s%s%s", config.getHudiTableBasePath(), StoragePath.SEPARATOR, LOCKS_FOLDER_NAME); + this.heartbeatManager = heartbeatManagerLoader.apply(ownerId, heartbeatPollSeconds * 1000, this::renewLock); + this.lockService = lockServiceLoader.apply(ownerId, lockFilePath, properties); + this.ownerId = ownerId; + this.logger = logger; + shutdownThread = new Thread(() -> shutdown(true)); + Runtime.getRuntime().addShutdownHook(shutdownThread); + logger.info("Instantiated new storage-based lock provider, owner: {}, lockfilePath: {}", ownerId, lockFilePath); + } + + // ----------------------------------------- + // BASE METHODS + // ----------------------------------------- + + @Override + public synchronized StorageLockFile getLock() { + return currentLockObj; + } + + /** + * Attempts to acquire the lock within the given timeout. + */ + @Override + public boolean tryLock(long time, TimeUnit unit) { + long deadlineNanos = System.nanoTime() + unit.toNanos(time); + + while (System.nanoTime() < deadlineNanos) { + try { + logDebugLockState(ACQUIRING); + if (tryLock()) { + return true; + } + Thread.sleep(DEFAULT_LOCK_ACQUISITION_BUFFER_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new HoodieLockException(generateLockStateMessage(LockState.FAILED_TO_ACQUIRE), e); + } + } + + return false; + } + + @Override + public synchronized void close() { + shutdown(false); + } + + private synchronized void shutdown(boolean fromShutdownHook) { + if (fromShutdownHook) { + // Try to expire the lock from the shutdown hook. + if (!isClosed && actuallyHoldsLock()) { + tryExpireCurrentLock(true); + } + // Do not execute any further actions + return; + } else { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + try { + this.unlock(); + } catch (Exception e) { + logger.error("Owner {}: Failed to unlock current lock.", ownerId, e); + } + try { + this.lockService.close(); + } catch (Exception e) { + logger.error("Owner {}: Lock service failed to close.", ownerId, e); + } + try { + this.heartbeatManager.close(); + } catch (Exception e) { + logger.error("Owner {}: Heartbeat manager failed to close.", ownerId, e); + } + + this.isClosed = true; + } + + private synchronized boolean isLockStillValid(StorageLockFile lock) { + return !lock.isExpired() && !isCurrentTimeCertainlyOlderThanDistributedTime(lock.getValidUntil()); + } + + /** + * Attempts a single pass to acquire the lock (non-blocking). + * + * @return true if lock acquired, false otherwise + */ + @Override + public synchronized boolean tryLock() { + assertHeartbeatManagerExists(); + assertUnclosed(); + logDebugLockState(ACQUIRING); + if (actuallyHoldsLock()) { + // Supports reentrant locks + return true; + } + + if (this.heartbeatManager.hasActiveHeartbeat()) { + logger.error("Detected broken invariant: there is an active heartbeat without a lock being held."); + // Breach of object invariant - we should never have an active heartbeat without + // holding a lock. + throw new HoodieLockException(generateLockStateMessage(FAILED_TO_ACQUIRE)); + } + + Pair latestLock = this.lockService.readCurrentLockFile(); + if (latestLock.getLeft() == LockGetResult.UNKNOWN_ERROR) { + logInfoLockState(FAILED_TO_ACQUIRE, "Failed to get the latest lock status"); + // We were not able to determine whether a lock was present. + return false; + } + + if (latestLock.getLeft() == LockGetResult.SUCCESS && isLockStillValid(latestLock.getRight())) { + String msg = String.format("Lock already held by %s", latestLock.getRight().getOwner()); + // Lock held by others. + logInfoLockState(FAILED_TO_ACQUIRE, msg); + return false; + } + + // Try to acquire the lock + StorageLockData newLockData = new StorageLockData(false, System.currentTimeMillis() + validitySeconds, ownerId); + Pair lockUpdateStatus = this.lockService.tryCreateOrUpdateLockFile( + newLockData, latestLock.getLeft() == LockGetResult.NOT_EXISTS ? null : latestLock.getRight()); + if (lockUpdateStatus.getLeft() != LockUpdateResult.SUCCESS) { + // failed to acquire the lock, indicates concurrent contention + logInfoLockState(FAILED_TO_ACQUIRE); + return false; + } + this.setLock(lockUpdateStatus.getRight()); + + // There is a remote chance that + // - after lock is acquired but before heartbeat starts the lock is expired. + // - lock is acquired and heartbeat is up yet it does not run timely before the + // lock is expired + // It is mitigated by setting the lock validity period to a reasonably long + // period to survive until heartbeat comes, plus + // set the heartbeat interval relatively small enough. + if (!this.heartbeatManager.startHeartbeatForThread(Thread.currentThread())) { + // Precondition "no active heartbeat" is checked previously, so when + // startHeartbeatForThread returns false, + // we are confident no heartbeat thread is running. + logErrorLockState(RELEASING, "We were unable to start the heartbeat!"); + tryExpireCurrentLock(false); + return false; + } + + logInfoLockState(ACQUIRED); + return true; + } + + /** + * Determines whether this provider currently holds a valid lock. + * + *

+ * This method checks both the existence of a lock object and its validity. A + * lock is considered + * valid only if it exists and has not expired according to its timestamp. + * + * @return {@code true} if this provider holds a valid lock, {@code false} + * otherwise + */ + private boolean actuallyHoldsLock() { + return believesLockMightBeHeld() && isLockStillValid(getLock()); + } + + /** + * Checks if this provider has a non-null lock object reference. + * + *

+ * A non-null lock object indicates that this provider has previously + * **successfully** acquired a lock via + * StorageBasedLockProvider##lock and has not yet **successfully** released + * it via StorageBasedLockProvider#unlock(). + * It is merely an indicator that the lock might be held by this provider. To + * truly certify we are the owner of the lock, + * StorageBasedLockProvider#actuallyHoldsLock should be used. + * + * @return {@code true} if this provider has a non-null lock object, + * {@code false} otherwise + * @see StorageBasedLockProvider#actuallyHoldsLock() + */ + private boolean believesLockMightBeHeld() { + return this.getLock() != null; + } + + /** + * Unlock by marking our current lock file "expired": true. + */ + @Override + public synchronized void unlock() { + assertHeartbeatManagerExists(); + if (!believesLockMightBeHeld()) { + return; + } + boolean believesNoLongerHoldsLock = true; + + // Try to stop the heartbeat first + if (heartbeatManager.hasActiveHeartbeat()) { + logger.debug("Owner {}: Gracefully shutting down heartbeat.", ownerId); + believesNoLongerHoldsLock &= heartbeatManager.stopHeartbeat(true); + } + + // Then expire the current lock. + believesNoLongerHoldsLock &= tryExpireCurrentLock(false); + if (!believesNoLongerHoldsLock) { + throw new HoodieLockException(generateLockStateMessage(FAILED_TO_RELEASE)); + } + } + + private void assertHeartbeatManagerExists() { + if (heartbeatManager == null) { + // broken function precondition. + throw new HoodieLockException("Unexpected null heartbeatManager"); + } + } + + private void assertUnclosed() { + if (this.isClosed) { + throw new HoodieLockException("Lock provider already closed"); + } + } + + /** + * Tries to expire the currently held lock. + * @param fromShutdownHook Whether we are attempting best effort quick unlock from shutdown hook. + * @return True if we were successfully able to upload an expired lock. + */ + private synchronized boolean tryExpireCurrentLock(boolean fromShutdownHook) { + // It does not make sense to have heartbeat alive extending the lock lease while + // here we are trying + // to expire the lock. + if (!fromShutdownHook && heartbeatManager.hasActiveHeartbeat()) { + // broken function precondition. + throw new HoodieLockException("Must stop heartbeat before expire lock file"); + } + logDebugLockState(RELEASING); + // Upload metadata that will unlock this lock. + StorageLockData expiredLockData = new StorageLockData(true, this.getLock().getValidUntil(), ownerId); + Pair result; + if (fromShutdownHook) { + // Only try once for shutdown hook, then return immediately + result = this.lockService.tryCreateOrUpdateLockFile(expiredLockData, this.getLock()); + } else { + result = this.lockService.tryCreateOrUpdateLockFileWithRetry( + () -> expiredLockData, + this.getLock(), + // Keep retrying for the normal validity time. + LOCK_UPSERT_RETRY_COUNT); + } + switch (result.getLeft()) { + case UNKNOWN_ERROR: + // Here we do not know the state of the lock. + logErrorLockState(FAILED_TO_RELEASE, "Lock state is unknown."); + return false; + case SUCCESS: + logInfoLockState(RELEASED); + setLock(null); + return true; + case ACQUIRED_BY_OTHERS: + // As we are confident no lock is held by itself, clean up the cached lock object. + // However this is an edge case, so warn. + logWarnLockState(RELEASED, "lock should not have been acquired by others."); + setLock(null); + return true; + default: + throw new HoodieLockException("Unexpected lock update result: " + result.getLeft()); + } + } + + /** + * Renews (heartbeats) the current lock if we are the holder, it forcefully set + * the expiration flag + * to false and the lock expiration time to a later time in the future. + * @return True if we successfully renewed the lock, false if not. + */ + @VisibleForTesting + protected synchronized boolean renewLock() { + try { + // If we don't hold the lock, no-op. + if (!believesLockMightBeHeld()) { + logger.warn("Owner {}: Cannot renew, no lock held by this process", ownerId); + // No need to extend lock lease. + return false; + } + + long oldExpiration = getLock().getValidUntil(); + // Attempt conditional update, extend lock. There are 3 cases: + // 1. Happy case: lock has not expired yet, we extend the lease to a longer + // period. + // 2. Corner case 1: lock is expired and is acquired by others, lock renewal + // failed with ACQUIRED_BY_OTHERS. + // 3. Corner case 2: lock is expired but no one has acquired it yet, lock + // renewal "revived" the expired lock. + // Please note we expect the corner cases almost never happens. + // Action taken for corner case 2 is just a best effort mitigation. At least it + // prevents further data corruption by + // letting someone else acquire the lock. + Pair currentLock = this.lockService.tryCreateOrUpdateLockFileWithRetry( + () -> new StorageLockData(false, System.currentTimeMillis() + validitySeconds, ownerId), + getLock(), + LOCK_UPSERT_RETRY_COUNT); + switch (currentLock.getLeft()) { + case ACQUIRED_BY_OTHERS: + logger.error("Owner {}: Unable to renew lock as it is acquired by others.", ownerId); + // No need to extend lock lease anymore. + return false; + case UNKNOWN_ERROR: + // This could be transient, but unclear, we will let the heartbeat continue + // normally. + // If the next heartbeat run identifies our lock has expired we will error out. + logger.warn("Owner {}: Unable to renew lock due to unknown error, could be transient.", ownerId); + // Let heartbeat retry later. + return true; + case SUCCESS: + // Only positive outcome + this.setLock(currentLock.getRight()); + logger.info("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}.", + ownerId, oldExpiration - System.currentTimeMillis(), lockFilePath); + // Let heartbeat continue to renew lock lease again later. + return true; + default: + throw new HoodieLockException("Unexpected lock update result: " + currentLock.getLeft()); + } + } catch (Exception e) { + logger.error("Owner {}: Exception occurred while renewing lock", ownerId, e); + return false; + } + } + + // --------- + // Utilities + // --------- + + /** + * Method to calculate whether a timestamp from a distributed source has + * definitively occurred yet. + */ + protected boolean isCurrentTimeCertainlyOlderThanDistributedTime(long epoch) { + return System.currentTimeMillis() > epoch + CLOCK_DRIFT_BUFFER_MS; + } + + private String generateLockStateMessage(LockState state) { + String threadName = Thread.currentThread().getName(); + return String.format("Owner %s: Lock file path %s, Thread %s, Storage based lock state %s", ownerId, + lockFilePath, threadName, state.toString()); + } + + private static final String LOCK_STATE_LOGGER_MSG = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}"; + private static final String LOCK_STATE_LOGGER_MSG_WITH_INFO = "Owner {}: Lock file path {}, Thread {}, Storage based lock state {}, {}"; + + private void logDebugLockState(LockState state) { + logger.debug(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, Thread.currentThread(), state); + } + + private void logInfoLockState(LockState state) { + logger.info(LOCK_STATE_LOGGER_MSG, ownerId, lockFilePath, Thread.currentThread(), state); + } + + private void logInfoLockState(LockState state, String msg) { + logger.info(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg); + } + + private void logWarnLockState(LockState state, String msg) { + logger.warn(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg); + } + + private void logErrorLockState(LockState state, String msg) { + logger.error(LOCK_STATE_LOGGER_MSG_WITH_INFO, ownerId, lockFilePath, Thread.currentThread(), state, msg); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java index 4beba39dc9579..be73bd7cb2c0e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockProviderHeartbeatManager.java @@ -35,7 +35,7 @@ /** * LockProviderHeartbeatManager is a helper class which handles the scheduling and stopping of heartbeat - * tasks. This is intended for use with the conditional write lock provider, which requires + * tasks. This is intended for use with the storage based lock provider, which requires * a separate thread to spawn and renew the lock repeatedly. * It should be responsible for the entire lifecycle of the heartbeat task. * Importantly, a new instance should be created for each lock provider. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java index 2a045f31aefc3..dfccf23d67edd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java @@ -35,7 +35,7 @@ public class StorageLockFile { private final StorageLockData data; private final String versionId; - // Get a custom object mapper. See ConditionalWriteLockData for required properties. + // Get a custom object mapper. See StorageLockData for required properties. private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() // This allows us to add new properties down the line. .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) @@ -43,7 +43,7 @@ public class StorageLockFile { .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES); /** - * Initializes a ConditionalWriteLockFile using the data and unique versionId. + * Initializes a StorageLockFile using the data and unique versionId. * * @param data The data in the lock file. * @param versionId The version of this lock file. Used to ensure consistency through conditional writes. @@ -56,11 +56,11 @@ public StorageLockFile(StorageLockData data, String versionId) { } /** - * Factory method to load an input stream into a ConditionalWriteLockFile. + * Factory method to load an input stream into a StorageLockFile. * * @param inputStream The input stream of the JSON content. * @param versionId The unique version identifier for the lock file. - * @return A new instance of ConditionalWriteLockFile. + * @return A new instance of StorageLockFile. * @throws HoodieIOException If deserialization fails. */ public static StorageLockFile createFromStream(InputStream inputStream, String versionId) { @@ -68,7 +68,7 @@ public static StorageLockFile createFromStream(InputStream inputStream, String v StorageLockData data = OBJECT_MAPPER.readValue(inputStream, StorageLockData.class); return new StorageLockFile(data, versionId); } catch (IOException e) { - throw new HoodieIOException("Failed to deserialize JSON content into ConditionalWriteLockData", e); + throw new HoodieIOException("Failed to deserialize JSON content into StorageLockData", e); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java new file mode 100644 index 0000000000000..bc5a52db71bf6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/StorageBasedLockConfig.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; + +public class StorageBasedLockConfig extends HoodieConfig { + private static final String SINCE_VERSION_1_0_2 = "1.0.2"; + private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX + + "storage."; + + public static final ConfigProperty VALIDITY_TIMEOUT_SECONDS = ConfigProperty + .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs") + .defaultValue(TimeUnit.MINUTES.toSeconds(5)) + .markAdvanced() + .sinceVersion(SINCE_VERSION_1_0_2) + .withDocumentation( + "For storage-based lock provider, the amount of time in seconds each new lock is valid for. " + + "The lock provider will attempt to renew its lock until it successfully extends the lock lease period " + + "or the validity timeout is reached."); + + public static final ConfigProperty HEARTBEAT_POLL_SECONDS = ConfigProperty + .key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs") + .defaultValue(30L) + .markAdvanced() + .sinceVersion(SINCE_VERSION_1_0_2) + .withDocumentation( + "For storage-based lock provider, the amount of time in seconds to wait before renewing the lock. " + + "Defaults to 30 seconds."); + + public long getValiditySeconds() { + return getLong(VALIDITY_TIMEOUT_SECONDS); + } + + public long getHeartbeatPollSeconds() { + return getLong(HEARTBEAT_POLL_SECONDS); + } + + public String getHudiTableBasePath() { + return getString(BASE_PATH); + } + + public static class Builder { + private final StorageBasedLockConfig lockConfig = new StorageBasedLockConfig(); + + public StorageBasedLockConfig build() { + lockConfig.setDefaults(StorageBasedLockConfig.class.getName()); + return lockConfig; + } + + public StorageBasedLockConfig.Builder fromProperties(TypedProperties props) { + lockConfig.getProps().putAll(props); + checkRequiredProps(); + return this; + } + + private void checkRequiredProps() { + String notExistsMsg = " does not exist!"; + if (!lockConfig.contains(BASE_PATH)) { + throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg); + } + if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) + * 3) { + throw new IllegalArgumentException( + VALIDITY_TIMEOUT_SECONDS.key() + " should be more than triple " + HEARTBEAT_POLL_SECONDS.key()); + } + if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 5) { + throw new IllegalArgumentException( + VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal to 5 seconds."); + } + if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) { + throw new IllegalArgumentException( + HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal to 1 second."); + } + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java new file mode 100644 index 0000000000000..9e5f4a6d689f8 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/TestStorageBasedLockProvider.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.client.transaction.lock.models.HeartbeatManager; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpdateResult; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.StorageBasedLockConfig; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.storage.StorageConfiguration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit test class for StorageBasedLockProvider + */ +class TestStorageBasedLockProvider { + private StorageBasedLockProvider lockProvider; + private StorageLock mockLockService; + private HeartbeatManager mockHeartbeatManager; + private Logger mockLogger; + private final String ownerId = UUID.randomUUID().toString(); + private static final int DEFAULT_LOCK_VALIDITY_MS = 5000; + + @BeforeEach + void setupLockProvider() { + mockLockService = mock(StorageLock.class); + mockHeartbeatManager = mock(HeartbeatManager.class); + mockLogger = mock(Logger.class); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + TypedProperties props = new TypedProperties(); + props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5"); + props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1"); + props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default"); + + lockProvider = spy(new StorageBasedLockProvider( + ownerId, + props, + (a,b,c) -> mockHeartbeatManager, + (a,b,c) -> mockLockService, + mockLogger)); + } + + @AfterEach + void cleanupLockProvider() { + lockProvider.close(); + } + + @Test + void testUnsupportedLockStorageLocation() { + TypedProperties props = new TypedProperties(); + props.put(BASE_PATH.key(), "hdfs://bucket/lake/db/tbl-default"); + LockConfiguration lockConf = new LockConfiguration(props); + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + HoodieLockException ex = assertThrows(HoodieLockException.class, + () -> new StorageBasedLockProvider(lockConf, storageConf)); + assertTrue(ex.getCause().getMessage().contains("No implementation of StorageLock supports this scheme")); + } + + @Test + void testValidLockStorageLocation() { + TypedProperties props = new TypedProperties(); + props.put(BASE_PATH.key(), "s3://bucket/lake/db/tbl-default"); + + LockConfiguration lockConf = new LockConfiguration(props); + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + + HoodieLockException ex = assertThrows(HoodieLockException.class, + () -> new StorageBasedLockProvider(lockConf, storageConf)); + assertTrue(ex.getMessage().contains("Failed to load and initialize StorageLock")); + } + + @ParameterizedTest + @ValueSource(strings = { "gs://bucket/lake/db/tbl-default", "s3://bucket/lake/db/tbl-default", + "s3a://bucket/lake/db/tbl-default" }) + void testNonExistentWriteServiceWithDefaults(String tableBasePathString) { + TypedProperties props = new TypedProperties(); + props.put(BASE_PATH.key(), tableBasePathString); + + LockConfiguration lockConf = new LockConfiguration(props); + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + + HoodieLockException ex = assertThrows(HoodieLockException.class, + () -> new StorageBasedLockProvider(lockConf, storageConf)); + assertTrue(ex.getMessage().contains("Failed to load and initialize StorageLock")); + } + + @Test + void testInvalidLocksLocationForWriteService() { + TypedProperties props = new TypedProperties(); + props.put(BASE_PATH.key(), "gs://bucket/lake/db/tbl-default"); + props.put(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5"); + props.put(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1"); + + LockConfiguration lockConf = new LockConfiguration(props); + StorageConfiguration storageConf = HoodieTestUtils.getDefaultStorageConf(); + + HoodieLockException ex = assertThrows(HoodieLockException.class, + () -> new StorageBasedLockProvider(lockConf, storageConf)); + Throwable cause = ex.getCause(); + assertNotNull(cause); + assertInstanceOf(HoodieNotSupportedException.class, cause); + } + + @Test + void testTryLockForTimeUnitThrowsOnInterrupt() throws Exception { + doReturn(false).when(lockProvider).tryLock(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + lockProvider.tryLock(1, TimeUnit.SECONDS); + } catch (HoodieLockException e) { + latch.countDown(); + } + }); + t.start(); + Thread.sleep(50); + t.interrupt(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + void testTryLockForTimeUnitAcquiresLockEventually() throws Exception { + AtomicInteger count = new AtomicInteger(0); + doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + assertTrue(lockProvider.tryLock(4, TimeUnit.SECONDS)); + latch.countDown(); + }); + t.start(); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + @Test + void testTryLockForTimeUnitFailsToAcquireLockEventually() throws Exception { + AtomicInteger count = new AtomicInteger(0); + doAnswer(inv -> count.incrementAndGet() > 2).when(lockProvider).tryLock(); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + assertFalse(lockProvider.tryLock(1, TimeUnit.SECONDS)); + latch.countDown(); + }); + t.start(); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + void testTryLockSuccess() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + assertEquals(realLockFile, lockProvider.getLock()); + verify(mockLockService, atLeastOnce()).tryCreateOrUpdateLockFile(any(), any()); + } + + @Test + void testTryLockSuccessButFailureToStartHeartbeat() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(false); + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(realLockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + + boolean acquired = lockProvider.tryLock(); + assertFalse(acquired); + } + + @Test + void testTryLockFailsFromOwnerMismatch() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockFile returnedLockFile = new StorageLockFile( + new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, "different-owner"), "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, returnedLockFile)); + + HoodieLockException ex = assertThrows(HoodieLockException.class, () -> lockProvider.tryLock()); + assertTrue(ex.getMessage().contains("Owners do not match")); + } + + @Test + void testTryLockFailsDueToExistingLock() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, + "other-owner"); + StorageLockFile existingLock = new StorageLockFile(data, "v2"); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS, existingLock)); + + boolean acquired = lockProvider.tryLock(); + assertFalse(acquired); + } + + @Test + void testTryLockFailsToUpdateFile() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.ACQUIRED_BY_OTHERS, null)); + assertFalse(lockProvider.tryLock()); + } + + @Test + void testTryLockFailsDueToUnknownState() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.UNKNOWN_ERROR, null)); + assertFalse(lockProvider.tryLock()); + } + + @Test + void testTryLockSucceedsWhenExistingLockExpiredByTime() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, + "other-owner"); + StorageLockFile existingLock = new StorageLockFile(data, "v2"); + StorageLockData newData = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile realLockFile = new StorageLockFile(newData, "v1"); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.SUCCESS, existingLock)); + when(mockLockService.tryCreateOrUpdateLockFile(any(), eq(existingLock))) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + } + + @Test + void testTryLockReentrancySucceeds() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + // Re-entrancy succeeds + assertTrue(lockProvider.tryLock()); + } + + @Test + void testTryLockReentrancyAfterLockExpiredByTime() { + // In an extremely unlikely scenario, we could have a local reference to a lock + // which is present but expired, + // and because we were unable to stop the heartbeat properly, we did not + // successfully set it to null. + // Due to the nature of the heartbeat manager, this is expected to introduce + // some delay, but not be permanently blocking. + // There are a few variations of this edge case, so we must test them all. + + // Here the lock is still "unexpired" but the time shows expired. + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile expiredLock = new StorageLockFile(data, "v1"); + doReturn(expiredLock).when(lockProvider).getLock(); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile validLock = new StorageLockFile(validData, "v2"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, validLock)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + assertTrue(lockProvider.tryLock()); + } + + @Test + void testTryLockReentrancyAfterLockSetExpired() { + // In an extremely unlikely scenario, we could have a local reference to a lock + // which is present but expired, + // and because we were unable to stop the heartbeat properly, we did not + // successfully set it to null. + // Due to the nature of the heartbeat manager, this is expected to introduce + // some delay, but not be permanently blocking. + // There are a few variations of this edge case, so we must test them all. + + // Here the lock is "expired" but the time shows unexpired. + StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile expiredLock = new StorageLockFile(data, "v1"); + doReturn(expiredLock).when(lockProvider).getLock(); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData validData = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile validLock = new StorageLockFile(validData, "v2"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, validLock)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + assertTrue(lockProvider.tryLock()); + } + + @Test + void testTryLockHeartbeatStillActive() { + // In an extremely unlikely scenario, we could have a local reference to a lock + // which is present but expired, + // and because we were unable to stop the heartbeat properly, we did not + // successfully set it to null. + // Due to the nature of the heartbeat manager, this is expected to introduce + // some delay, but not be permanently blocking. + // There are a few variations of this edge case, so we must test them all. + + // Here the heartbeat is still active, so we have to error out. + StorageLockData data = new StorageLockData(true, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile expiredLock = new StorageLockFile(data, "v1"); + doReturn(expiredLock).when(lockProvider).getLock(); + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + assertThrows(HoodieLockException.class, () -> lockProvider.tryLock()); + } + + @Test + void testUnlockSucceedsAndReentrancy() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(true); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(realLockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, + new StorageLockFile(new StorageLockData(true, data.getValidUntil(), ownerId), "v2"))); + assertTrue(lockProvider.tryLock()); + when(mockHeartbeatManager.hasActiveHeartbeat()) + .thenReturn(true) // when we try to stop the heartbeat we will check if heartbeat is active return true. + .thenReturn(false); // when try to set lock to expire we will assert no active heartbeat as a precondition. + lockProvider.unlock(); + assertNull(lockProvider.getLock()); + lockProvider.unlock(); + } + + @Test + void testUnlockFailsToStopHeartbeat() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + assertThrows(HoodieLockException.class, () -> lockProvider.unlock()); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + } + + @Test + void testCloseFailsToStopHeartbeat() { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + assertTrue(lockProvider.tryLock()); + when(mockHeartbeatManager.stopHeartbeat(true)).thenReturn(false); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + // Should wrap the exception and log error. + lockProvider.close(); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + } + + @Test + void testRenewLockReturnsFalseWhenNoLockHeld() { + doReturn(null).when(lockProvider).getLock(); + assertFalse(lockProvider.renewLock()); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(true); + verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this process", this.ownerId); + } + + @Test + void testRenewLockWithoutHoldingLock() { + doReturn(null).when(lockProvider).getLock(); + assertFalse(lockProvider.renewLock()); + when(mockHeartbeatManager.hasActiveHeartbeat()).thenReturn(false); + verify(mockLogger).warn("Owner {}: Cannot renew, no lock held by this process", this.ownerId); + } + + @Test + void testRenewLockWithFullyExpiredLock() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() - DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile nearExpiredLockFile = new StorageLockFile(data, "v1"); + doReturn(nearExpiredLockFile).when(lockProvider).getLock(); + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(nearExpiredLockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.ACQUIRED_BY_OTHERS, null)); + assertFalse(lockProvider.renewLock()); + verify(mockLogger).error("Owner {}: Unable to renew lock as it is acquired by others.", this.ownerId); + } + + @Test + void testRenewLockUnableToUpsertLockFileButNotFatal() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + // Signal the upsert attempt failed, but may be transient. See interface for + // more details. + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(lockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.UNKNOWN_ERROR, null)); + assertTrue(lockProvider.renewLock()); + } + + @Test + void testRenewLockUnableToUpsertLockFileFatal() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + // Signal the upsert attempt failed, but may be transient. See interface for + // more details. + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(lockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.UNKNOWN_ERROR, null)); + // renewLock return true so it will be retried. + assertTrue(lockProvider.renewLock()); + + verify(mockLogger).warn("Owner {}: Unable to renew lock due to unknown error, could be transient.", this.ownerId); + } + + @Test + void testRenewLockSucceedsButRenewalWithinExpirationWindow() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + + StorageLockData nearExpirationData = new StorageLockData(false, System.currentTimeMillis(), ownerId); + StorageLockFile lockFileNearExpiration = new StorageLockFile(nearExpirationData, "v2"); + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(lockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, lockFileNearExpiration)); + + // We used to fail in this case before, but since we are only modifying a single + // lock file, this is ok now. + // Therefore, this can be a happy path variation. + assertTrue(lockProvider.renewLock()); + } + + @Test + void testRenewLockSucceeds() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + + StorageLockData successData = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, + ownerId); + StorageLockFile successLockFile = new StorageLockFile(successData, "v2"); + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(lockFile), anyLong())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, successLockFile)); + assertTrue(lockProvider.renewLock()); + + verify(mockLogger).info( + eq("Owner {}: Lock renewal successful. The renewal completes {} ms before expiration for lock {}."), + eq(this.ownerId), anyLong(), eq("gs://bucket/lake/db/tbl-default/.hoodie/.locks")); + } + + @Test + void testRenewLockFails() { + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile lockFile = new StorageLockFile(data, "v1"); + doReturn(lockFile).when(lockProvider).getLock(); + + when(mockLockService.tryCreateOrUpdateLockFileWithRetry(any(), eq(lockFile), anyLong())) + .thenThrow(new RuntimeException("Failure")); + assertFalse(lockProvider.renewLock()); + + verify(mockLogger).error(eq("Owner {}: Exception occurred while renewing lock"), eq(ownerId), + any(RuntimeException.class)); + } + + @Test + void testCloseCallsDependencies() throws Exception { + lockProvider.close(); + verify(mockLockService, atLeastOnce()).close(); + verify(mockHeartbeatManager, atLeastOnce()).close(); + assertNull(lockProvider.getLock()); + } + + @Test + void testCloseWithErrorForLockService() throws Exception { + doThrow(new RuntimeException("Some failure")).when(mockLockService).close(); + lockProvider.close(); + verify(mockLogger).error(eq("Owner {}: Lock service failed to close."), eq(ownerId), any(RuntimeException.class)); + assertNull(lockProvider.getLock()); + } + + @Test + void testCloseWithErrorForHeartbeatManager() throws Exception { + doThrow(new RuntimeException("Some failure")).when(mockHeartbeatManager).close(); + lockProvider.close(); + verify(mockLogger).error(eq("Owner {}: Heartbeat manager failed to close."), eq(ownerId), + any(RuntimeException.class)); + assertNull(lockProvider.getLock()); + } + + @Test + public void testShutdownHookViaReflection() throws Exception { + when(mockLockService.readCurrentLockFile()).thenReturn(Pair.of(LockGetResult.NOT_EXISTS, null)); + StorageLockData data = new StorageLockData(false, System.currentTimeMillis() + DEFAULT_LOCK_VALIDITY_MS, ownerId); + StorageLockFile realLockFile = new StorageLockFile(data, "v1"); + when(mockLockService.tryCreateOrUpdateLockFile(any(), isNull())) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + when(mockHeartbeatManager.startHeartbeatForThread(any())).thenReturn(true); + + boolean acquired = lockProvider.tryLock(); + assertTrue(acquired); + assertEquals(realLockFile, lockProvider.getLock()); + verify(mockLockService, atLeastOnce()).tryCreateOrUpdateLockFile(any(), any()); + + when(mockLockService.tryCreateOrUpdateLockFile(any(StorageLockData.class), eq(realLockFile))) + .thenReturn(Pair.of(LockUpdateResult.SUCCESS, realLockFile)); + + // Mock shutdown + Method shutdownMethod = lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class); + shutdownMethod.setAccessible(true); + shutdownMethod.invoke(lockProvider, true); + + // Verify that the expected shutdown behaviors occurred. + assertNull(lockProvider.getLock()); + // We do not execute additional actions + verify(mockLockService, never()).close(); + verify(mockHeartbeatManager, never()).close(); + } + + @Test + public void testShutdownHookWhenNoLockPresent() throws Exception { + // Now, when calling shutdown(true), the method should immediately return. + Method shutdownMethod = lockProvider.getClass().getDeclaredMethod("shutdown", boolean.class); + shutdownMethod.setAccessible(true); + shutdownMethod.invoke(lockProvider, true); + + // Verify that unlock or close methods are NOT invoked, or adjust expectations accordingly. + verify(mockLockService, never()).close(); + verify(mockHeartbeatManager, never()).close(); + } + + public static class StubStorageLock implements StorageLock { + public StubStorageLock(String arg1, String arg2, String arg3) { + // No-op constructor for reflection + } + + @Override + public Pair tryCreateOrUpdateLockFile(StorageLockData newLockData, + StorageLockFile previousLockFile) { + return null; + } + + @Override + public Pair tryCreateOrUpdateLockFileWithRetry( + Supplier newLockDataSupplier, + StorageLockFile previousLockFile, + long retryExpiration) { + return null; + } + + @Override + public Pair readCurrentLockFile() { + return null; + } + + @Override + public void close() throws Exception { + // stub, no-op + } + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java new file mode 100644 index 0000000000000..3be7e5550b253 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestStorageBasedLockConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.TypedProperties; + +import org.junit.jupiter.api.Test; + +import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestStorageBasedLockConfig { + + @Test + void testDefaultValues() { + // Asserts that the defaults are correct + TypedProperties props = new TypedProperties(); + props.setProperty(BASE_PATH.key(), "s3://hudi-bucket/table/basepath"); + + StorageBasedLockConfig.Builder builder = new StorageBasedLockConfig.Builder(); + StorageBasedLockConfig config = builder + .fromProperties(props) + .build(); + + assertEquals(5 * 60, config.getValiditySeconds(), "Default lock validity should be 5 minutes"); + assertEquals(30, config.getHeartbeatPollSeconds(), "Default heartbeat poll time should be 30 seconds"); + } + + @Test + void testCustomValues() { + // Testing that custom values which differ from defaults can be read properly + TypedProperties props = new TypedProperties(); + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "120"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "10"); + props.setProperty(BASE_PATH.key(), "/hudi/table/basepath"); + + StorageBasedLockConfig config = new StorageBasedLockConfig.Builder() + .fromProperties(props) + .build(); + + assertEquals(120, config.getValiditySeconds()); + assertEquals(10, config.getHeartbeatPollSeconds()); + assertEquals("/hudi/table/basepath", config.getHudiTableBasePath()); + } + + @Test + void testBasePathPropertiesValidation() { + // Tests that validations around the base path are present. + TypedProperties props = new TypedProperties(); + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "120"); + StorageBasedLockConfig.Builder propsBuilder = new StorageBasedLockConfig.Builder(); + + // Missing base path + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(BASE_PATH.key())); + } + + @Test + void testTimeThresholds() { + // Ensure that validations which restrict the time-based inputs are working. + TypedProperties props = new TypedProperties(); + props.setProperty(BASE_PATH.key(), "/hudi/table/basepath"); + // Invalid config case: validity timeout is less than triple of heartbeat poll period + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "3"); + StorageBasedLockConfig.Builder propsBuilder = new StorageBasedLockConfig.Builder(); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key())); + // Invalid config case: validity timeout is less than 5 seconds + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "4"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "1"); + exception = assertThrows(IllegalArgumentException.class, () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key())); + // Invalid config case: heartbeat poll period is less than 1 second + props.setProperty(StorageBasedLockConfig.VALIDITY_TIMEOUT_SECONDS.key(), "5"); + props.setProperty(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key(), "0"); + exception = assertThrows(IllegalArgumentException.class, () -> propsBuilder.fromProperties(props)); + assertTrue(exception.getMessage().contains(StorageBasedLockConfig.HEARTBEAT_POLL_SECONDS.key())); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 4c0e947d73b17..c42024f42ebad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -131,6 +131,8 @@ public class HoodieTableMetaClient implements Serializable { public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + StoragePath.SEPARATOR + ".fileids"; + public static final String LOCKS_FOLDER_NAME = METAFOLDER_NAME + StoragePath.SEPARATOR + ".locks"; + public static final String SCHEMA_FOLDER_NAME = ".schema"; public static final String MARKER_EXTN = ".marker"; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java index e718bc21ed6b8..25fe3e34c6fdc 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/fs/TestStorageSchemes.java @@ -39,6 +39,15 @@ public void testStorageSchemes() { assertTrue(StorageSchemes.isSchemeSupported("afs")); assertFalse(StorageSchemes.isSchemeSupported("s2")); + for (StorageSchemes scheme : StorageSchemes.values()) { + String schemeName = scheme.getScheme(); + if (scheme.getScheme().startsWith("s3")) { + assertTrue(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent()); + } else { + assertFalse(StorageSchemes.getStorageLockImplementationIfExists(schemeName).isPresent()); + } + } + assertTrue(StorageSchemes.isAtomicCreationSupported("file")); assertTrue(StorageSchemes.isAtomicCreationSupported("hdfs")); assertFalse(StorageSchemes.isAtomicCreationSupported("afs")); diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java index ca795917f98f8..aabb50104eb45 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/StorageSchemes.java @@ -19,6 +19,9 @@ package org.apache.hudi.storage; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; + import java.util.Arrays; import java.util.Set; import java.util.HashSet; @@ -28,65 +31,66 @@ */ public enum StorageSchemes { // Local filesystem - FILE("file", false, true), + FILE("file", false, true, null), // Hadoop File System - HDFS("hdfs", false, true), + HDFS("hdfs", false, true, null), // Baidu Advanced File System - AFS("afs", null, null), + AFS("afs", null, null, null), // Mapr File System - MAPRFS("maprfs", null, null), + MAPRFS("maprfs", null, null, null), // Apache Ignite FS - IGNITE("igfs", null, null), + IGNITE("igfs", null, null, null), // AWS S3 - S3A("s3a", true, null), - S3("s3", true, null), + S3A("s3a", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLock"), + S3("s3", true, null, "org.apache.hudi.aws.transaction.lock.S3StorageLock"), // Google Cloud Storage - GCS("gs", true, null), + GCS("gs", true, null, null), // Azure WASB - WASB("wasb", null, null), - WASBS("wasbs", null, null), + WASB("wasb", null, null, null), + WASBS("wasbs", null, null, null), // Azure ADLS - ADL("adl", null, null), + ADL("adl", null, null, null), // Azure ADLS Gen2 - ABFS("abfs", null, null), - ABFSS("abfss", null, null), + ABFS("abfs", null, null, null), + ABFSS("abfss", null, null, null), // Aliyun OSS - OSS("oss", null, null), - // View FS for federated setups. If federating across cloud stores, then append support is false + OSS("oss", null, null, null), + // View FS for federated setups. If federating across cloud stores, then append + // support is false // View FS support atomic creation - VIEWFS("viewfs", null, true), - //ALLUXIO - ALLUXIO("alluxio", null, null), + VIEWFS("viewfs", null, true, null), + // ALLUXIO + ALLUXIO("alluxio", null, null, null), // Tencent Cloud Object Storage - COSN("cosn", null, null), + COSN("cosn", null, null, null), // Tencent Cloud HDFS - CHDFS("ofs", null, null), + CHDFS("ofs", null, null, null), // Tencent Cloud CacheFileSystem - GOOSEFS("gfs", null, null), + GOOSEFS("gfs", null, null, null), // Databricks file system - DBFS("dbfs", null, null), + DBFS("dbfs", null, null, null), // IBM Cloud Object Storage - COS("cos", null, null), + COS("cos", null, null, null), // Huawei Cloud Object Storage - OBS("obs", null, null), + OBS("obs", null, null, null), // Kingsoft Standard Storage ks3 - KS3("ks3", null, null), + KS3("ks3", null, null, null), // Netease Object Storage nos - NOS("nos", null, null), + NOS("nos", null, null, null), // JuiceFileSystem - JFS("jfs", null, null), + JFS("jfs", null, null, null), // Baidu Object Storage - BOS("bos", null, null), + BOS("bos", null, null, null), // Oracle Cloud Infrastructure Object Storage - OCI("oci", null, null), + OCI("oci", null, null, null), // Volcengine Object Storage - TOS("tos", null, null), + TOS("tos", null, null, null), // Volcengine Cloud HDFS - CFS("cfs", null, null), + CFS("cfs", null, null, null), // Aliyun Apsara File Storage for HDFS - DFS("dfs", false, true), + DFS("dfs", false, true, null), // Hopsworks File System - HOPSFS("hopsfs", false, true); + HOPSFS("hopsfs", false, true, null); // list files may bring pressure to storage with centralized meta service like HDFS. // when we want to get only part of files under a directory rather than all files, use getStatus may be more friendly than listStatus. @@ -98,11 +102,17 @@ public enum StorageSchemes { private final Boolean isWriteTransactional; // null for uncertain if dfs support atomic create&delete, please update this for each FS private final Boolean supportAtomicCreation; + private final String storageLockClass; - StorageSchemes(String scheme, Boolean isWriteTransactional, Boolean supportAtomicCreation) { + StorageSchemes( + String scheme, + Boolean isWriteTransactional, + Boolean supportAtomicCreation, + String storageLockClass) { this.scheme = scheme; this.isWriteTransactional = isWriteTransactional; this.supportAtomicCreation = supportAtomicCreation; + this.storageLockClass = storageLockClass; } public String getScheme() { @@ -117,6 +127,14 @@ public boolean isAtomicCreationSupported() { return supportAtomicCreation != null && supportAtomicCreation; } + public boolean implementsStorageLock() { + return !StringUtils.isNullOrEmpty(storageLockClass); + } + + public String getStorageLockClass() { + return storageLockClass; + } + public static boolean isSchemeSupported(String scheme) { return Arrays.stream(values()).anyMatch(s -> s.getScheme().equals(scheme)); } @@ -143,4 +161,12 @@ public static boolean isListStatusFriendly(String scheme) { return LIST_STATUS_FRIENDLY_SCHEMES.contains(scheme); } + + public static Option getStorageLockImplementationIfExists(String scheme) { + if (!isSchemeSupported(scheme)) { + throw new IllegalArgumentException("Unsupported scheme :" + scheme); + } + return Option.fromJavaOptional(Arrays.stream(StorageSchemes.values()) + .filter(s -> s.implementsStorageLock() && s.scheme.equals(scheme)).findFirst()); + } }