diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 512c055e1000..2e00447f7f92 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4449,6 +4449,16 @@
+
+ ozone.om.snapshot.cache.lock.timeout
+ 5s
+ OZONE, OM
+
+ Wait Timeout for snapshot cache lock.
+ Uses milliseconds by default when no time unit is specified.
+
+
+
ozone.om.snapshot.load.native.lib
true
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 3a09ae6f20b6..c4beced0e549 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -591,6 +591,12 @@ private OMConfigKeys() {
public static final long
OZONE_OM_SNAPSHOT_DIFF_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT
= TimeUnit.MINUTES.toMillis(1);
+ public static final String
+ OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT
+ = "ozone.om.snapshot.cache.lock.timeout";
+ public static final long
+ OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT_DEFAULT
+ = TimeUnit.SECONDS.toMillis(5);
public static final long
OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT
= TimeUnit.MINUTES.toMillis(1);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 14409f8c6387..fe09ede0102d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -26,6 +26,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES;
@@ -279,8 +281,12 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
.getTimeDuration(OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL,
OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
+ long cacheLockTimeout = ozoneManager.getConfiguration()
+ .getTimeDuration(OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT,
+ OZONE_OM_SNAPSHOT_CACHE_LOCK_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
this.snapshotCache = new SnapshotCache(loader, softCacheSize, ozoneManager.getMetrics(),
- cacheCleanupServiceInterval);
+ cacheCleanupServiceInterval, cacheLockTimeout);
this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
ozoneManager, snapDiffJobCf, snapDiffReportCf,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
index 2f85f6d21b4c..948d26aa1e28 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.om.snapshot;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,10 +52,10 @@ public class ReferenceCounted
/**
* Parent instance whose callback will be triggered upon this RC closure.
*/
- private final ReferenceCountedCallback parentWithCallback;
+ private final ReferenceCountedCallback parentWithCallback;
public ReferenceCounted(T obj, boolean disableCounter,
- ReferenceCountedCallback parentWithCallback) {
+ ReferenceCountedCallback parentWithCallback) {
// A param to allow disabling ref counting to reduce active DB
// access penalties due to AtomicLong operations.
this.obj = obj;
@@ -125,9 +127,7 @@ public long decrementRefCount() {
Preconditions.checkState(newValTotal >= 0L,
"Total reference count underflow");
}
- if (refCount.get() == 0) {
- this.parentWithCallback.callback(this);
- }
+ this.parentWithCallback.callback(this);
return refCount.get();
}
@@ -160,4 +160,8 @@ public void close() {
// so it is eligible to be used with try-with-resources.
decrementRefCount();
}
+
+ public Map getThreadCntMap() {
+ return ImmutableMap.copyOf(threadMap);
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java
index 1572459166e6..62efb563e042 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCountedCallback.java
@@ -20,6 +20,6 @@
/**
* Callback interface for ReferenceCounted.
*/
-public interface ReferenceCountedCallback {
- void callback(ReferenceCounted referenceCounted);
+public interface ReferenceCountedCallback {
+ void callback(ReferenceCounted referenceCounted);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index d2b64296dc60..4d9eae223b7b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -22,10 +22,17 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import java.io.IOException;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -36,7 +43,7 @@
/**
* Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
*/
-public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable {
+public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable {
static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
@@ -45,9 +52,7 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable {
// Value: OmSnapshot instance, each holds a DB instance handle inside
// TODO: [SNAPSHOT] Consider wrapping SoftReference<> around IOmMetadataReader
private final ConcurrentHashMap> dbMap;
-
private final CacheLoader cacheLoader;
-
// Soft-limit of the total number of snapshot DB instances allowed to be
// opened on the OM.
private final int cacheSizeLimit;
@@ -57,14 +62,32 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable {
"SnapshotCacheCleanupService";
private final OMMetrics omMetrics;
+ private final ReadWriteLock lock;
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int lockCnt;
+ private final Condition lockReleased;
+ private final Condition dbClosed;
+ private final long lockTimeout;
+ private final Map> snapshotRefThreadIds;
+ private final AtomicBoolean closed;
public SnapshotCache(CacheLoader cacheLoader, int cacheSizeLimit, OMMetrics omMetrics,
- long cleanupInterval) {
+ long cleanupInterval, long lockTimeout) {
this.dbMap = new ConcurrentHashMap<>();
this.cacheLoader = cacheLoader;
this.cacheSizeLimit = cacheSizeLimit;
this.omMetrics = omMetrics;
this.pendingEvictionQueue = ConcurrentHashMap.newKeySet();
+ this.lock = new ReentrantReadWriteLock();
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+ this.snapshotRefThreadIds = new ConcurrentHashMap<>();
+ this.lockTimeout = lockTimeout;
+ this.lockCnt = 0;
+ this.lockReleased = this.readLock.newCondition();
+ this.dbClosed = this.writeLock.newCondition();
+ this.closed = new AtomicBoolean(false);
if (cleanupInterval > 0) {
this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE,
true, 1);
@@ -96,10 +119,21 @@ public void invalidate(UUID key) {
if (v == null) {
LOG.warn("SnapshotId: '{}' does not exist in snapshot cache.", k);
} else {
+ readLock.lock();
try {
+ if (lockCnt > 0) {
+ for (Long tid : snapshotRefThreadIds.keySet()) {
+ snapshotRefThreadIds.computeIfPresent(tid, (threadId, map) -> {
+ map.computeIfPresent(key, (uuid, val) -> null);
+ return map.isEmpty() ? null : map;
+ });
+ }
+ }
v.get().close();
} catch (IOException e) {
throw new IllegalStateException("Failed to close snapshotId: " + key, e);
+ } finally {
+ readLock.unlock();
}
omMetrics.decNumSnapshotCacheSize();
}
@@ -118,12 +152,136 @@ public void invalidateAll() {
@Override
public void close() {
+ closed.set(true);
invalidateAll();
if (this.scheduler != null) {
this.scheduler.close();
}
}
+ /**
+ * Decreases the lock count. When the count reaches zero all new threads would be able to get a handle of snapshot.
+ */
+ private Runnable decrementLockCount() {
+ lockCnt -= 1;
+ if (lockCnt <= 0) {
+ LOG.warn("Invalid negative lock count : {}. Setting it to 0", lockCnt);
+ lockCnt = 0;
+ }
+
+ if (lockCnt == 0) {
+ snapshotRefThreadIds.clear();
+ }
+ return () -> {
+ readLock.lock();
+ try {
+ if (lockCnt == 0) {
+ lockReleased.signalAll();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ };
+ }
+
+ /**
+ * Releases a lock on the cache.
+ */
+ public void releaseLock() {
+ writeLock.lock();
+ Runnable callback = decrementLockCount();
+ try {
+ decrementLockCount();
+ } finally {
+ writeLock.unlock();
+ }
+ callback.run();
+ }
+
+ /**
+ * Acquires lock on the cache within max amount time.
+ * @param timeout Max time to wait to acquire lock.
+ * @return true if lock is acquired otherwise false.
+ * @throws InterruptedException
+ */
+ public boolean tryAcquire(long timeout) throws InterruptedException {
+ long endTime = System.currentTimeMillis() + timeout;
+ if (timeout <= 0) {
+ endTime = Long.MAX_VALUE;
+ timeout = Long.MAX_VALUE;
+ }
+ if (writeLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ Runnable rollbackCallback = null;
+ try {
+ lockCnt += 1;
+ if (lockCnt == 1) {
+ snapshotRefThreadIds.clear();
+ dbMap.values().stream()
+ .flatMap(referenceCounted ->
+ referenceCounted.getThreadCntMap().entrySet().stream().map(entry -> Pair.of(entry, referenceCounted)))
+ .forEach(entry -> updateThreadCnt(entry.getKey().getKey(), entry.getValue().get().getSnapshotID(),
+ entry.getKey().getValue()));
+ }
+ while (!snapshotRefThreadIds.isEmpty()) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime >= endTime) {
+ // If and release acquired lock
+ rollbackCallback = decrementLockCount();
+ break;
+ }
+ dbClosed.await(Math.min(endTime - currentTime, lockTimeout), TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ if (rollbackCallback != null) {
+ rollbackCallback.run();
+ return false;
+ }
+ invalidateAll();
+ return true;
+ }
+ return false;
+ }
+
+ private void updateThreadCnt(long threadId, UUID key, long cnt) {
+ snapshotRefThreadIds.compute(threadId, (tid, countMap) -> {
+ if (countMap == null) {
+ if (cnt <= 0) {
+ return null;
+ }
+ countMap = new ConcurrentHashMap<>();
+ }
+ countMap.compute(key, (snapId, count) -> {
+ if (count == null) {
+ count = 0L;
+ }
+ count += cnt;
+ return count > 0 ? count : null;
+ });
+ return countMap.isEmpty() ? null : countMap;
+ });
+ }
+
+ /**
+ * Waits for lock to be released. This function doesn't wait for the lock if the thread already has a few snapshots
+ * open. It only waits if the thread is reading it's first snapshot.
+ * @param threadId
+ * @throws InterruptedException
+ */
+ private void waitForLock(long threadId) throws IOException {
+ if (snapshotRefThreadIds.computeIfPresent(threadId, (k, v) -> v) != null) {
+ while (lockCnt > 0) {
+ try {
+ lockReleased.await(lockTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new IOException("Error while waiting for locks to be released.", e);
+ }
+
+ }
+ }
+ }
+
/**
* State the reason the current thread is getting the OmSnapshot instance.
* Unused for now.
@@ -149,33 +307,50 @@ public ReferenceCounted get(UUID key) throws IOException {
}
// Atomic operation to initialize the OmSnapshot instance (once) if the key
// does not exist, and increment the reference count on the instance.
- ReferenceCounted rcOmSnapshot =
- dbMap.compute(key, (k, v) -> {
- if (v == null) {
- LOG.info("Loading SnapshotId: '{}'", k);
- try {
- v = new ReferenceCounted<>(cacheLoader.load(key), false, this);
- } catch (OMException omEx) {
- // Return null if the snapshot is no longer active
- if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
- throw new IllegalStateException(omEx);
+ ReferenceCounted rcOmSnapshot;
+ readLock.lock();
+ try {
+ long threadId = Thread.currentThread().getId();
+ waitForLock(threadId);
+ rcOmSnapshot =
+ dbMap.compute(key, (k, v) -> {
+ if (v == null) {
+ if (closed.get()) {
+ return v;
}
- } catch (IOException ioEx) {
- // Failed to load snapshot DB
- throw new IllegalStateException(ioEx);
- } catch (Exception ex) {
- // Unexpected and unknown exception thrown from CacheLoader#load
- throw new IllegalStateException(ex);
+ LOG.info("Loading SnapshotId: '{}'", k);
+ try {
+ v = new ReferenceCounted<>(cacheLoader.load(key), false, this);
+ } catch (OMException omEx) {
+ // Return null if the snapshot is no longer active
+ if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+ throw new IllegalStateException(omEx);
+ }
+ } catch (IOException ioEx) {
+ // Failed to load snapshot DB
+ throw new IllegalStateException(ioEx);
+ } catch (Exception ex) {
+ // Unexpected and unknown exception thrown from CacheLoader#load
+ throw new IllegalStateException(ex);
+ }
+ omMetrics.incNumSnapshotCacheSize();
}
- omMetrics.incNumSnapshotCacheSize();
- }
- if (v != null) {
- // When RC OmSnapshot is successfully loaded
- v.incrementRefCount();
- }
- return v;
- });
+ if (v != null) {
+ // When RC OmSnapshot is successfully loaded
+ v.incrementRefCount();
+ if (lockCnt > 0) {
+ updateThreadCnt(threadId, key, 1);
+ }
+ }
+ return v;
+ });
+ } finally {
+ readLock.unlock();
+ }
if (rcOmSnapshot == null) {
+ if (closed.get()) {
+ throw new IOException("Unable to open snapshot with SnapshotId: '" + key + "' since snapshot cache is closed.");
+ }
// The only exception that would fall through the loader logic above
// is OMException with FILE_NOT_FOUND.
throw new OMException("SnapshotId: '" + key + "' not found, or the snapshot is no longer active.",
@@ -238,11 +413,23 @@ void cleanup() {
* @param referenceCounted ReferenceCounted object
*/
@Override
- public void callback(ReferenceCounted referenceCounted) {
+ public void callback(ReferenceCounted referenceCounted) {
+ long threadId = Thread.currentThread().getId();
+ UUID snapshotId = referenceCounted.get().getSnapshotID();
+ // Remove snapshotRef from the thread count map.
+ writeLock.lock();
+ try {
+ if (lockCnt > 0) {
+ updateThreadCnt(threadId, snapshotId, -1);
+ }
+ dbClosed.signal();
+ } finally {
+ writeLock.unlock();
+ }
+
if (referenceCounted.getTotalRefCount() == 0L) {
// Reference count reaches zero, add to pendingEvictionList
- pendingEvictionQueue.add(((OmSnapshot) referenceCounted.get())
- .getSnapshotID());
+ pendingEvictionQueue.add(snapshotId);
}
}
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
index 4c8a2a5f60b6..b11a26f7bf64 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
@@ -79,7 +79,7 @@ static void beforeAll() throws Exception {
void setUp() {
// Reset cache for each test case
omMetrics = OMMetrics.create();
- snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50);
+ snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50, 5000);
}
@AfterEach
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index cd0c00839899..9cb1b2aa4fc7 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -364,7 +364,7 @@ public void init() throws RocksDBException, IOException, ExecutionException {
omSnapshotManager = mock(OmSnapshotManager.class);
when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
- SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0);
+ SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0, 5000);
when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), anyString()))
.thenAnswer(invocationOnMock -> {