Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,26 @@ public void release(UUID key) {
*/
public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK),
() -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK));
() -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> cleanup(true));
}

private UncheckedAutoCloseableSupplier<OMLockDetails> lock(
Supplier<OMLockDetails> lockFunction, Supplier<OMLockDetails> unlockFunction) {
/**
* Acquires a write lock on a specific snapshot database and returns an auto-closeable supplier for lock details.
* The lock ensures that the operations accessing the snapshot database are performed in a thread safe manner. The
* returned supplier automatically releases the lock acquired when closed, preventing potential resource
* contention or deadlocks.
*/
public UncheckedAutoCloseableSupplier<OMLockDetails> lock(UUID snapshotId) {
return lock(() -> lock.acquireWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()),
() -> lock.releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()),
() -> cleanup(snapshotId));
}

private UncheckedAutoCloseableSupplier<OMLockDetails> lock(Supplier<OMLockDetails> lockFunction,
Supplier<OMLockDetails> unlockFunction, Supplier<Void> cleanupFunction) {
AtomicReference<OMLockDetails> lockDetails = new AtomicReference<>(lockFunction.get());
if (lockDetails.get().isLockAcquired()) {
cleanup(true);
cleanupFunction.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we closing the snapshot instance upon acquiring a lock?

Copy link
Contributor Author

@swamirishi swamirishi Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want a rocksdb open in the cache once a write lock has been acquired. We would be using this for atomically switching the rocksdb directory after defragging a snapshot

if (!dbMap.isEmpty()) {
lockDetails.set(unlockFunction.get());
}
Expand Down Expand Up @@ -324,43 +336,49 @@ public OMLockDetails get() {
* If cache size exceeds soft limit, attempt to clean up and close the
instances that has zero reference count.
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammatical error: 'instances that has' should be 'instances that have' for subject-verb agreement.

Suggested change
instances that has zero reference count.
instances that have zero reference count.

Copilot uses AI. Check for mistakes.
*/
private synchronized void cleanup(boolean force) {
private synchronized Void cleanup(boolean force) {
if (force || dbMap.size() > cacheSizeLimit) {
for (UUID evictionKey : pendingEvictionQueue) {
ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
if (snapshot != null && snapshot.getTotalRefCount() == 0) {
try {
compactSnapshotDB(snapshot.get());
} catch (IOException e) {
LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
evictionKey, e.getMessage());
}
}

dbMap.compute(evictionKey, (k, v) -> {
pendingEvictionQueue.remove(k);
if (v == null) {
throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
"instance of the Snapshot may not be closed properly.");
}
cleanup(evictionKey);
}
}
return null;
}

if (v.getTotalRefCount() > 0) {
LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
return v;
} else {
LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
// Close the instance, which also closes its DB handle.
try {
v.get().close();
} catch (IOException ex) {
throw new IllegalStateException("Error while closing snapshot DB.", ex);
}
omMetrics.decNumSnapshotCacheSize();
return null;
}
});
private synchronized Void cleanup(UUID evictionKey) {
ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
if (snapshot != null && snapshot.getTotalRefCount() == 0) {
try {
compactSnapshotDB(snapshot.get());
} catch (IOException e) {
LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
evictionKey, e.getMessage());
}
}

dbMap.compute(evictionKey, (k, v) -> {
pendingEvictionQueue.remove(k);
if (v == null) {
throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
"instance of the Snapshot may not be closed properly.");
}

if (v.getTotalRefCount() > 0) {
LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
return v;
} else {
LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
// Close the instance, which also closes its DB handle.
try {
v.get().close();
} catch (IOException ex) {
throw new IllegalStateException("Error while closing snapshot DB.", ex);
}
omMetrics.decNumSnapshotCacheSize();
return null;
}
});
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,26 @@ public void testGetHoldsReadLock(int numberOfLocks) throws IOException {
@ParameterizedTest
@ValueSource(ints = {0, 1, 5, 10})
@DisplayName("Tests lock() holds a write lock")
public void testGetHoldsWriteLock(int numberOfLocks) {
public void testLockHoldsWriteLock(int numberOfLocks) {
clearInvocations(lock);
for (int i = 0; i < numberOfLocks; i++) {
snapshotCache.lock();
}
verify(lock, times(numberOfLocks)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
}

@ParameterizedTest
@ValueSource(ints = {0, 1, 5, 10})
@DisplayName("Tests lock(snapshotId) holds a write lock")
public void testLockHoldsWriteLockSnapshotId(int numberOfLocks) {
clearInvocations(lock);
UUID snapshotId = UUID.randomUUID();
for (int i = 0; i < numberOfLocks; i++) {
snapshotCache.lock(snapshotId);
}
verify(lock, times(numberOfLocks)).acquireWriteLock(eq(SNAPSHOT_DB_LOCK), eq(snapshotId.toString()));
}

@Test
@DisplayName("get() same entry twice yields one cache entry only")
void testGetTwice() throws IOException {
Expand Down
Loading