From ead32f2ed26d95c9f8991c2263b34e0b7c7c4e5e Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Fri, 24 Jan 2025 04:12:24 -0800 Subject: [PATCH 1/2] HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap Change-Id: I97e0e0e28736cc51bfb23c45d3750cf721a9689e --- .../src/main/resources/ozone-default.xml | 10 + .../apache/hadoop/ozone/om/OMConfigKeys.java | 6 + .../hadoop/ozone/om/OmSnapshotManager.java | 8 +- .../ozone/om/snapshot/ReferenceCounted.java | 14 +- .../om/snapshot/ReferenceCountedCallback.java | 4 +- .../ozone/om/snapshot/SnapshotCache.java | 231 +++++++++++++++--- .../ozone/om/snapshot/TestSnapshotCache.java | 2 +- .../om/snapshot/TestSnapshotDiffManager.java | 2 +- 8 files changed, 236 insertions(+), 41 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index dfd058f5d709..25560e5f86d5 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4494,6 +4494,16 @@ + + ozone.om.snapshot.cache.lock.timeout + 5s + OZONE, OM + + Wait Timeout for snapshot cache lock. + Uses milliseconds by default when no time unit is specified. + + + ozone.om.snapshot.load.native.lib true diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index e274d822b63e..cfcc31756e51 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -599,6 +599,12 @@ private OMConfigKeys() { public static final long OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(1); + public static final String + OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT + = "ozone.om.snapshot.cache.lock.timeout"; + public static final long + OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT_DEFAULT + = TimeUnit.SECONDS.toMillis(5); public static final long OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(1); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index f817625a9796..277ad9ef0821 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -88,6 +88,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES; @@ -282,8 +284,12 @@ public OmSnapshotManager(OzoneManager ozoneManager) { .getTimeDuration(OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL, OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + long cacheLockTimeout = ozoneManager.getConfiguration() + .getTimeDuration(OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT, + OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); this.snapshotCache = new SnapshotCache(loader, softCacheSize, ozoneManager.getMetrics(), - cacheCleanupServiceInterval); + cacheCleanupServiceInterval, cacheLockTimeout); this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ, ozoneManager, snapDiffJobCf, snapDiffReportCf, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java index 97e19eb969d8..42a1e9927883 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java @@ -18,7 +18,9 @@ package org.apache.hadoop.ozone.om.snapshot; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -51,10 +53,10 @@ public class ReferenceCounted /** * Parent instance whose callback will be triggered upon this RC closure. */ - private final ReferenceCountedCallback parentWithCallback; + private final ReferenceCountedCallback parentWithCallback; public ReferenceCounted(T obj, boolean disableCounter, - ReferenceCountedCallback parentWithCallback) { + ReferenceCountedCallback parentWithCallback) { // A param to allow disabling ref counting to reduce active DB // access penalties due to AtomicLong operations. this.obj = obj; @@ -126,9 +128,7 @@ public long decrementRefCount() { Preconditions.checkState(newValTotal >= 0L, "Total reference count underflow"); } - if (refCount.get() == 0) { - this.parentWithCallback.callback(this); - } + this.parentWithCallback.callback(this); return refCount.get(); } @@ -161,4 +161,8 @@ public void close() { // so it is eligible to be used with try-with-resources. decrementRefCount(); } + + public Map getThreadCntMap() { + return ImmutableMap.copyOf(threadMap); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java index d63f5783b808..62e542e05fe1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java @@ -20,6 +20,6 @@ /** * Callback interface for ReferenceCounted. */ -public interface ReferenceCountedCallback { - void callback(ReferenceCounted referenceCounted); +public interface ReferenceCountedCallback { + void callback(ReferenceCounted referenceCounted); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java index 035fc80d3468..0a383f581262 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -27,17 +28,23 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND; /** * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances. */ -public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable { +public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable { static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class); @@ -46,9 +53,7 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable { // Value: OmSnapshot instance, each holds a DB instance handle inside // TODO: [SNAPSHOT] Consider wrapping SoftReference<> around IOmMetadataReader private final ConcurrentHashMap> dbMap; - private final CacheLoader cacheLoader; - // Soft-limit of the total number of snapshot DB instances allowed to be // opened on the OM. private final int cacheSizeLimit; @@ -58,14 +63,31 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable { "SnapshotCacheCleanupService"; private final OMMetrics omMetrics; + private final ReadWriteLock lock; + private final Lock readLock; + private final Lock writeLock; + private int lockCnt; + private final Condition lockReleased; + private final Condition dbClosed; + private final long lockTimeout; + private final Map> snapshotRefThreadIds; + private AtomicBoolean closed; public SnapshotCache(CacheLoader cacheLoader, int cacheSizeLimit, OMMetrics omMetrics, - long cleanupInterval) { + long cleanupInterval, long lockTimeout) { this.dbMap = new ConcurrentHashMap<>(); this.cacheLoader = cacheLoader; this.cacheSizeLimit = cacheSizeLimit; this.omMetrics = omMetrics; this.pendingEvictionQueue = ConcurrentHashMap.newKeySet(); + this.lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); + this.snapshotRefThreadIds = new ConcurrentHashMap<>(); + this.lockTimeout = lockTimeout; + this.lockCnt = 0; + this.lockReleased = this.readLock.newCondition(); + this.dbClosed = this.writeLock.newCondition(); if (cleanupInterval > 0) { this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE, true, 1); @@ -97,10 +119,21 @@ public void invalidate(UUID key) { if (v == null) { LOG.warn("SnapshotId: '{}' does not exist in snapshot cache.", k); } else { + readLock.lock(); try { + if (lockCnt > 0) { + for (Long tid : snapshotRefThreadIds.keySet()) { + snapshotRefThreadIds.computeIfPresent(tid, (threadId, map) -> { + map.computeIfPresent(key, (uuid, val) -> null); + return map.isEmpty() ? null : map; + }); + } + } v.get().close(); } catch (IOException e) { throw new IllegalStateException("Failed to close snapshotId: " + key, e); + } finally { + readLock.unlock(); } omMetrics.decNumSnapshotCacheSize(); } @@ -119,12 +152,119 @@ public void invalidateAll() { @Override public void close() { + closed.set(true); invalidateAll(); if (this.scheduler != null) { this.scheduler.close(); } } + /** + * Decreases the lock count. When the count reaches zero all new threads would be able to get a handle of snapshot. + */ + private void decrementLockCount() { + lockCnt -= 1; + if (lockCnt <= 0) { + LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt); + lockCnt = 0; + } + if (lockCnt == 0) { + lockReleased.signalAll(); + snapshotRefThreadIds.clear(); + } + } + + /** + * Releases a lock on the cache. + */ + public void releaseLock() { + writeLock.lock(); + try { + decrementLockCount(); + } finally { + writeLock.unlock(); + } + } + + /** + * Acquires lock on the cache within max amount time. + * @param timeout Max time to wait to acquire lock. + * @return true if lock is acquired otherwise false. + * @throws InterruptedException + */ + public boolean tryAcquire(long timeout) throws InterruptedException { + long endTime = System.currentTimeMillis() + timeout; + if (timeout <= 0) { + endTime = Long.MAX_VALUE; + timeout = Long.MAX_VALUE; + } + if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + try { + lockCnt += 1; + if (lockCnt == 1) { + snapshotRefThreadIds.clear(); + dbMap.values().stream() + .flatMap(referenceCounted -> + referenceCounted.getThreadCntMap().entrySet().stream().map(entry -> Pair.of(entry, referenceCounted))) + .forEach(entry -> updateThreadCnt(entry.getKey().getKey(), entry.getValue().get().getSnapshotID(), + entry.getKey().getValue())); + } + while (!snapshotRefThreadIds.isEmpty()) { + long currentTime = System.currentTimeMillis(); + if (currentTime >= endTime) { + // If and release acquired lock + decrementLockCount(); + return false; + } + dbClosed.await(Math.min(endTime - currentTime, lockTimeout), TimeUnit.MILLISECONDS); + } + } finally { + writeLock.unlock(); + } + invalidateAll(); + return true; + } + return false; + } + + private void updateThreadCnt(long threadId, UUID key, long cnt) { + snapshotRefThreadIds.compute(threadId, (tid, countMap) -> { + if (countMap == null) { + if (cnt <= 0) { + return null; + } + countMap = new ConcurrentHashMap<>(); + } + countMap.compute(key, (snapId, count) -> { + if (count == null) { + count = 0L; + } + count += cnt; + return count > 0 ? count : null; + }); + return countMap.isEmpty() ? null : countMap; + }); + } + + /** + * Waits for lock to be released. This function doesn't wait for the lock if the thread already has a few snapshots + * open. It only waits if the thread is reading it's first snapshot. + * @param threadId + * @throws InterruptedException + */ + private void waitForLock(long threadId) throws IOException { + if (snapshotRefThreadIds.computeIfPresent(threadId, (k, v) -> v) != null) { + while (lockCnt > 0) { + try { + lockReleased.await(lockTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IOException("Error while waiting for locks to be released.", e); + } + + } + } + } + /** * State the reason the current thread is getting the OmSnapshot instance. * Unused for now. @@ -150,33 +290,50 @@ public ReferenceCounted get(UUID key) throws IOException { } // Atomic operation to initialize the OmSnapshot instance (once) if the key // does not exist, and increment the reference count on the instance. - ReferenceCounted rcOmSnapshot = - dbMap.compute(key, (k, v) -> { - if (v == null) { - LOG.info("Loading SnapshotId: '{}'", k); - try { - v = new ReferenceCounted<>(cacheLoader.load(key), false, this); - } catch (OMException omEx) { - // Return null if the snapshot is no longer active - if (!omEx.getResult().equals(FILE_NOT_FOUND)) { - throw new IllegalStateException(omEx); + ReferenceCounted rcOmSnapshot; + readLock.lock(); + try { + long threadId = Thread.currentThread().getId(); + waitForLock(threadId); + rcOmSnapshot = + dbMap.compute(key, (k, v) -> { + if (v == null) { + if (closed.get()) { + return v; + } + LOG.info("Loading SnapshotId: '{}'", k); + try { + v = new ReferenceCounted<>(cacheLoader.load(key), false, this); + } catch (OMException omEx) { + // Return null if the snapshot is no longer active + if (!omEx.getResult().equals(FILE_NOT_FOUND)) { + throw new IllegalStateException(omEx); + } + } catch (IOException ioEx) { + // Failed to load snapshot DB + throw new IllegalStateException(ioEx); + } catch (Exception ex) { + // Unexpected and unknown exception thrown from CacheLoader#load + throw new IllegalStateException(ex); } - } catch (IOException ioEx) { - // Failed to load snapshot DB - throw new IllegalStateException(ioEx); - } catch (Exception ex) { - // Unexpected and unknown exception thrown from CacheLoader#load - throw new IllegalStateException(ex); + omMetrics.incNumSnapshotCacheSize(); } - omMetrics.incNumSnapshotCacheSize(); - } - if (v != null) { - // When RC OmSnapshot is successfully loaded - v.incrementRefCount(); - } - return v; - }); + if (v != null) { + // When RC OmSnapshot is successfully loaded + v.incrementRefCount(); + if (lockCnt > 0) { + updateThreadCnt(threadId, key, 1); + } + } + return v; + }); + } finally { + readLock.unlock(); + } if (rcOmSnapshot == null) { + if (closed.get()) { + throw new IOException("Unable to open snapshot with SnapshotId: '" + key + "' since snapshot cache is closed."); + } // The only exception that would fall through the loader logic above // is OMException with FILE_NOT_FOUND. throw new OMException("SnapshotId: '" + key + "' not found, or the snapshot is no longer active.", @@ -239,11 +396,23 @@ void cleanup() { * @param referenceCounted ReferenceCounted object */ @Override - public void callback(ReferenceCounted referenceCounted) { + public void callback(ReferenceCounted referenceCounted) { + long threadId = Thread.currentThread().getId(); + UUID snapshotId = referenceCounted.get().getSnapshotID(); + // Remove snapshotRef from the thread count map. + writeLock.lock(); + try { + if (lockCnt > 0) { + updateThreadCnt(threadId, snapshotId, -1); + } + dbClosed.signal(); + } finally { + writeLock.unlock(); + } + if (referenceCounted.getTotalRefCount() == 0L) { // Reference count reaches zero, add to pendingEvictionList - pendingEvictionQueue.add(((OmSnapshot) referenceCounted.get()) - .getSnapshotID()); + pendingEvictionQueue.add(snapshotId); } } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java index 8546690fa0ae..bffd76896793 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java @@ -80,7 +80,7 @@ static void beforeAll() throws Exception { void setUp() { // Reset cache for each test case omMetrics = OMMetrics.create(); - snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50); + snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50, 5000); } @AfterEach diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 037e54d00085..3325d19188f9 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -365,7 +365,7 @@ public void init() throws RocksDBException, IOException, ExecutionException { omSnapshotManager = mock(OmSnapshotManager.class); when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); - SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0); + SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0, 5000); when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), anyString())) .thenAnswer(invocationOnMock -> { From 84c5c91031263adbb31a0958550fe215743117b8 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Fri, 24 Jan 2025 13:56:41 -0800 Subject: [PATCH 2/2] HDDS-12134. fix locking Change-Id: Ia5ecb2b863ba2ea2482ac2824aa7b6df98939bb5 --- .../ozone/om/snapshot/SnapshotCache.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java index 0a383f581262..a36b307a85af 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,6 +39,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND; @@ -71,7 +73,7 @@ public class SnapshotCache implements ReferenceCountedCallback, Auto private final Condition dbClosed; private final long lockTimeout; private final Map> snapshotRefThreadIds; - private AtomicBoolean closed; + private final AtomicBoolean closed; public SnapshotCache(CacheLoader cacheLoader, int cacheSizeLimit, OMMetrics omMetrics, long cleanupInterval, long lockTimeout) { @@ -88,6 +90,7 @@ public SnapshotCache(CacheLoader cacheLoader, int cacheSizeLim this.lockCnt = 0; this.lockReleased = this.readLock.newCondition(); this.dbClosed = this.writeLock.newCondition(); + this.closed = new AtomicBoolean(false); if (cleanupInterval > 0) { this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE, true, 1); @@ -162,16 +165,26 @@ public void close() { /** * Decreases the lock count. When the count reaches zero all new threads would be able to get a handle of snapshot. */ - private void decrementLockCount() { + private Runnable decrementLockCount() { lockCnt -= 1; if (lockCnt <= 0) { LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt); lockCnt = 0; } + if (lockCnt == 0) { - lockReleased.signalAll(); snapshotRefThreadIds.clear(); } + return () -> { + readLock.lock(); + try { + if (lockCnt == 0) { + lockReleased.signalAll(); + } + } finally { + readLock.unlock(); + } + }; } /** @@ -179,11 +192,13 @@ private void decrementLockCount() { */ public void releaseLock() { writeLock.lock(); + Runnable callback = decrementLockCount(); try { decrementLockCount(); } finally { writeLock.unlock(); } + callback.run(); } /** @@ -199,6 +214,7 @@ public boolean tryAcquire(long timeout) throws InterruptedException { timeout = Long.MAX_VALUE; } if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + Runnable rollbackCallback = null; try { lockCnt += 1; if (lockCnt == 1) { @@ -213,14 +229,18 @@ public boolean tryAcquire(long timeout) throws InterruptedException { long currentTime = System.currentTimeMillis(); if (currentTime >= endTime) { // If and release acquired lock - decrementLockCount(); - return false; + rollbackCallback = decrementLockCount(); + break; } dbClosed.await(Math.min(endTime - currentTime, lockTimeout), TimeUnit.MILLISECONDS); } } finally { writeLock.unlock(); } + if (rollbackCallback != null) { + rollbackCallback.run(); + return false; + } invalidateAll(); return true; }