Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,31 @@

package org.apache.hadoop.ozone.lock;

import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ratis.util.UncheckedAutoCloseable;

/** Bootstrap state handler interface. */
public interface BootstrapStateHandler {
Lock getBootstrapStateLock();

/** Bootstrap state handler lock implementation. */
class Lock implements AutoCloseable {
private final Semaphore semaphore = new Semaphore(1);
/** Bootstrap state handler lock implementation. Should be always acquired before opening any snapshot to avoid
* deadlocks*/
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space after period in the comment. Should be "Should be always acquired before opening any snapshot to avoid deadlocks."

Suggested change
* deadlocks*/
* deadlocks */

Copilot uses AI. Check for mistakes.
class Lock {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

public Lock lock() throws InterruptedException {
semaphore.acquire();
return this;
private UncheckedAutoCloseable lock(boolean readLock) {
java.util.concurrent.locks.Lock lock = readLock ? readWriteLock.readLock() : readWriteLock.writeLock();
lock.lock();
return lock::unlock;
Comment on lines +35 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line returns a method reference to unlock the acquired lock. Specifically, it creates an UncheckedAutoCloseable instance that, when closed, calls unlock() on the same lock, releasing it. This enables usage of the lock in a try-with-resources style, ensuring the lock will be released when no longer needed.

}

public void unlock() {
semaphore.release();
public UncheckedAutoCloseable acquireWriteLock() throws InterruptedException {
return lock(false);
}

@Override
public void close() {
unlock();
public UncheckedAutoCloseable acquireReadLock() throws InterruptedException {
return lock(true);
Comment on lines +39 to +44
Copy link

Copilot AI Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods acquireWriteLock() and acquireReadLock() declare throws InterruptedException, but they cannot actually throw this exception. The underlying lock.lock() call (line 35) is a blocking operation that doesn't throw InterruptedException.

If you need interruptible locking, use lock.lockInterruptibly() instead. Otherwise, remove the throws InterruptedException declaration from both methods.

Copilot uses AI. Check for mistakes.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,7 +97,7 @@ public void initialize(DBStore store, DBCheckpointMetrics metrics,
this.aclEnabled = omAclEnabled;
this.admins = new OzoneAdmins(allowedAdminUsers, allowedAdminGroups);
this.isSpnegoEnabled = isSpnegoAuthEnabled;
lock = new Lock();
lock = new NoOpLock();

// Create a directory for temp bootstrap data
File dbLocation = dbStore.getDbLocation();
Expand Down Expand Up @@ -214,7 +215,7 @@ public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServl
Set<String> receivedSstFiles = extractSstFilesToExclude(sstParam);
DBCheckpoint checkpoint = null;
Path tmpdir = null;
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireWriteLock()) {
tmpdir = Files.createTempDirectory(bootstrapTempData.toPath(),
"bootstrap-data-");
checkpoint = getCheckpoint(tmpdir, flush);
Expand Down Expand Up @@ -393,18 +394,22 @@ public BootstrapStateHandler.Lock getBootstrapStateLock() {
/**
* This lock is a no-op but can overridden by child classes.
*/
public static class Lock extends BootstrapStateHandler.Lock {
public Lock() {
public static class NoOpLock extends BootstrapStateHandler.Lock {

private final UncheckedAutoCloseable noopLock = () -> {
};

public NoOpLock() {
}

@Override
public BootstrapStateHandler.Lock lock()
throws InterruptedException {
return this;
public UncheckedAutoCloseable acquireReadLock() {
return noopLock;
}

@Override
public void unlock() {
public UncheckedAutoCloseable acquireWriteLock() {
return noopLock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactionJobInfo;
Expand Down Expand Up @@ -1104,7 +1105,7 @@ public void pruneOlderSnapshotsWithCompactionHistory() {
sstFileNodesRemoved);
}

try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) {
removeSstFiles(sstFileNodesRemoved);
removeKeyFromCompactionLogTable(keysToRemove);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -1266,7 +1267,7 @@ public void pruneSstFiles() {
nonLeafSstFiles);
}

try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) {
removeSstFiles(nonLeafSstFiles);
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -1325,7 +1326,7 @@ public void pruneSstFileValues() {
prunedSSTFilePath.toFile().getAbsolutePath());

// Move pruned.sst.tmp => file.sst and replace existing file atomically.
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) {
Files.move(prunedSSTFilePath, sstFilePath,
StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -699,8 +699,8 @@ private void waitForLock(RocksDBCheckpointDiffer differ,

Future<Boolean> future;
// Take the lock and start the consumer.
try (BootstrapStateHandler.Lock lock =
differ.getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable lock =
differ.getBootstrapStateLock().acquireWriteLock()) {
future = executorService.submit(
() -> {
c.accept(differ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileWriter;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.util.Time;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1223,8 +1223,8 @@ private void waitForLock(RocksDBCheckpointDiffer differ,

Future<Boolean> future;
// Take the lock and start the consumer.
try (BootstrapStateHandler.Lock lock =
differ.getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable lock =
differ.getBootstrapStateLock().acquireWriteLock()) {
future = executorService.submit(
() -> {
c.accept(differ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void write(int b) throws IOException {
});

when(scmDbCheckpointServletMock.getBootstrapStateLock()).thenReturn(
new DBCheckpointServlet.Lock());
new DBCheckpointServlet.NoOpLock());
scmDbCheckpointServletMock.init();
long initialCheckpointCount =
scmMetrics.getDBCheckpointMetrics().getNumCheckpoints();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -856,8 +857,7 @@ private void testBootstrapLocking() throws Exception {

// Confirm the other handlers are locked out when the bootstrap
// servlet takes the lock.
try (BootstrapStateHandler.Lock ignoredLock =
spyServlet.getBootstrapStateLock().lock()) {
try (AutoCloseable ignoredLock = spyServlet.getBootstrapStateLock().acquireWriteLock()) {
confirmServletLocksOutOtherHandler(keyDeletingService, executorService);
confirmServletLocksOutOtherHandler(snapshotDeletingService,
executorService);
Expand Down Expand Up @@ -898,8 +898,7 @@ private void confirmServletLocksOutOtherHandler(BootstrapStateHandler handler,
private void confirmOtherHandlerLocksOutServlet(BootstrapStateHandler handler,
BootstrapStateHandler servlet, ExecutorService executorService)
throws InterruptedException {
try (BootstrapStateHandler.Lock ignoredLock =
handler.getBootstrapStateLock().lock()) {
try (UncheckedAutoCloseable ignoredLock = handler.getBootstrapStateLock().acquireWriteLock()) {
Future<Boolean> test = checkLock(servlet, executorService);
// Servlet should fail to lock when other handler has taken it.
assertThrows(TimeoutException.class,
Expand All @@ -912,8 +911,7 @@ private Future<Boolean> checkLock(BootstrapStateHandler handler,
ExecutorService executorService) {
return executorService.submit(() -> {
try {
handler.getBootstrapStateLock().lock();
handler.getBootstrapStateLock().unlock();
handler.getBootstrapStateLock().acquireWriteLock().close();
return true;
} catch (InterruptedException e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -637,10 +638,25 @@ public void testBootstrapLockCoordination() throws Exception {
RocksDBCheckpointDiffer mockCheckpointDiffer = mock(RocksDBCheckpointDiffer.class);
// Create mock locks for each service
BootstrapStateHandler.Lock mockDeletingLock = mock(BootstrapStateHandler.Lock.class);
UncheckedAutoCloseable mockDeletingAcquiredLock = mock(UncheckedAutoCloseable.class);
when(mockDeletingLock.acquireWriteLock()).thenReturn(mockDeletingAcquiredLock);

BootstrapStateHandler.Lock mockDirDeletingLock = mock(BootstrapStateHandler.Lock.class);
UncheckedAutoCloseable mockDirDeletingAcquiredLock = mock(UncheckedAutoCloseable.class);
when(mockDirDeletingLock.acquireWriteLock()).thenReturn(mockDirDeletingAcquiredLock);

BootstrapStateHandler.Lock mockFilteringLock = mock(BootstrapStateHandler.Lock.class);
UncheckedAutoCloseable mockFilteringAcquiredLock = mock(UncheckedAutoCloseable.class);
when(mockFilteringLock.acquireWriteLock()).thenReturn(mockFilteringAcquiredLock);

BootstrapStateHandler.Lock mockSnapshotDeletingLock = mock(BootstrapStateHandler.Lock.class);
UncheckedAutoCloseable mockSnapshotDeletingAcquiredLock = mock(UncheckedAutoCloseable.class);
when(mockSnapshotDeletingLock.acquireWriteLock()).thenReturn(mockSnapshotDeletingAcquiredLock);

BootstrapStateHandler.Lock mockCheckpointDifferLock = mock(BootstrapStateHandler.Lock.class);
UncheckedAutoCloseable mockCheckpointDifferAcquiredLock = mock(UncheckedAutoCloseable.class);
when(mockCheckpointDifferLock.acquireWriteLock()).thenReturn(mockCheckpointDifferAcquiredLock);

// Configure service mocks to return their respective locks
when(mockDeletingService.getBootstrapStateLock()).thenReturn(mockDeletingLock);
when(mockDirDeletingService.getBootstrapStateLock()).thenReturn(mockDirDeletingLock);
Expand All @@ -665,25 +681,23 @@ public void testBootstrapLockCoordination() throws Exception {
// Create the actual Lock instance (this tests the real implementation)
OMDBCheckpointServlet.Lock bootstrapLock = new OMDBCheckpointServlet.Lock(mockOM);
// Test successful lock acquisition
BootstrapStateHandler.Lock result = bootstrapLock.lock();
UncheckedAutoCloseable result = bootstrapLock.acquireWriteLock();
// Verify all service locks were acquired
verify(mockDeletingLock).lock();
verify(mockDirDeletingLock).lock();
verify(mockFilteringLock).lock();
verify(mockSnapshotDeletingLock).lock();
verify(mockCheckpointDifferLock).lock();
verify(mockDeletingLock).acquireWriteLock();
verify(mockDirDeletingLock).acquireWriteLock();
verify(mockFilteringLock).acquireWriteLock();
verify(mockSnapshotDeletingLock).acquireWriteLock();
verify(mockCheckpointDifferLock).acquireWriteLock();
// Verify double buffer flush was called
verify(mockOM).awaitDoubleBufferFlush();
// Verify the lock returns itself
assertEquals(bootstrapLock, result);
// Test unlock
bootstrapLock.unlock();
result.close();
// Verify all service locks were released
verify(mockDeletingLock).unlock();
verify(mockDirDeletingLock).unlock();
verify(mockFilteringLock).unlock();
verify(mockSnapshotDeletingLock).unlock();
verify(mockCheckpointDifferLock).unlock();
verify(mockDeletingAcquiredLock).close();
verify(mockDirDeletingAcquiredLock).close();
verify(mockFilteringAcquiredLock).close();
verify(mockSnapshotDeletingAcquiredLock).close();
verify(mockCheckpointDifferAcquiredLock).close();
}

/**
Expand All @@ -709,13 +723,11 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception {
AtomicInteger servicesSucceeded = new AtomicInteger(0);
// Checkpoint thread holds bootstrap lock
Thread checkpointThread = new Thread(() -> {
try {
LOG.info("Acquiring bootstrap lock for checkpoint...");
BootstrapStateHandler.Lock acquired = bootstrapLock.lock();
LOG.info("Acquiring bootstrap lock for checkpoint...");
try (UncheckedAutoCloseable acquired = bootstrapLock.acquireWriteLock()) {
bootstrapAcquired.countDown();
Thread.sleep(3000); // Hold for 3 seconds
LOG.info("Releasing bootstrap lock...");
acquired.unlock();
} catch (Exception e) {
fail("Checkpoint failed: " + e.getMessage());
}
Expand All @@ -729,11 +741,12 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception {
LOG.info("{} : Trying to acquire lock...", serviceName);
servicesBlocked.incrementAndGet();
BootstrapStateHandler.Lock serviceLock = service.getBootstrapStateLock();
serviceLock.lock(); // Should block!
servicesBlocked.decrementAndGet();
servicesSucceeded.incrementAndGet();
LOG.info(" {} : Lock acquired!", serviceName);
serviceLock.unlock();
try (UncheckedAutoCloseable lock = serviceLock.acquireReadLock()) {
// Should block!
servicesBlocked.decrementAndGet();
servicesSucceeded.incrementAndGet();
LOG.info(" {} : Lock acquired!", serviceName);
}
}
allServicesCompleted.countDown();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public void shutdown() {
}

@Test
@Flaky("HDDS-13889")
@DisplayName("testSnapshotAndKeyDeletionBackgroundServices")
@SuppressWarnings("methodlength")
public void testSnapshotAndKeyDeletionBackgroundServices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -684,21 +686,21 @@ static class Lock extends BootstrapStateHandler.Lock {
}

@Override
public BootstrapStateHandler.Lock lock()
throws InterruptedException {
public UncheckedAutoCloseable acquireWriteLock() throws InterruptedException {
// First lock all the handlers.
List<UncheckedAutoCloseable> acquiredLocks = new ArrayList<>(locks.size());
for (BootstrapStateHandler.Lock lock : locks) {
lock.lock();
acquiredLocks.add(lock.acquireWriteLock());
}

// Then wait for the double buffer to be flushed.
om.awaitDoubleBufferFlush();
return this;
return () -> acquiredLocks.forEach(UncheckedAutoCloseable::close);
}

@Override
public void unlock() {
locks.forEach(BootstrapStateHandler.Lock::unlock);
public UncheckedAutoCloseable acquireReadLock() {
throw new UnsupportedOperationException("Read locks are not supported for OMDBCheckpointServlet");
}
}
}
Loading