diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java index d6de873b842e..eee070e9d2c9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/BootstrapStateHandler.java @@ -17,28 +17,31 @@ package org.apache.hadoop.ozone.lock; -import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ratis.util.UncheckedAutoCloseable; /** Bootstrap state handler interface. */ public interface BootstrapStateHandler { Lock getBootstrapStateLock(); - /** Bootstrap state handler lock implementation. */ - class Lock implements AutoCloseable { - private final Semaphore semaphore = new Semaphore(1); + /** Bootstrap state handler lock implementation. Should be always acquired before opening any snapshot to avoid + * deadlocks*/ + class Lock { + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - public Lock lock() throws InterruptedException { - semaphore.acquire(); - return this; + private UncheckedAutoCloseable lock(boolean readLock) { + java.util.concurrent.locks.Lock lock = readLock ? readWriteLock.readLock() : readWriteLock.writeLock(); + lock.lock(); + return lock::unlock; } - public void unlock() { - semaphore.release(); + public UncheckedAutoCloseable acquireWriteLock() throws InterruptedException { + return lock(false); } - @Override - public void close() { - unlock(); + public UncheckedAutoCloseable acquireReadLock() throws InterruptedException { + return lock(true); } } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java index dae2df9e5c38..2e0bbc5eb151 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +97,7 @@ public void initialize(DBStore store, DBCheckpointMetrics metrics, this.aclEnabled = omAclEnabled; this.admins = new OzoneAdmins(allowedAdminUsers, allowedAdminGroups); this.isSpnegoEnabled = isSpnegoAuthEnabled; - lock = new Lock(); + lock = new NoOpLock(); // Create a directory for temp bootstrap data File dbLocation = dbStore.getDbLocation(); @@ -214,7 +215,7 @@ public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServl Set receivedSstFiles = extractSstFilesToExclude(sstParam); DBCheckpoint checkpoint = null; Path tmpdir = null; - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireWriteLock()) { tmpdir = Files.createTempDirectory(bootstrapTempData.toPath(), "bootstrap-data-"); checkpoint = getCheckpoint(tmpdir, flush); @@ -393,18 +394,22 @@ public BootstrapStateHandler.Lock getBootstrapStateLock() { /** * This lock is a no-op but can overridden by child classes. */ - public static class Lock extends BootstrapStateHandler.Lock { - public Lock() { + public static class NoOpLock extends BootstrapStateHandler.Lock { + + private final UncheckedAutoCloseable noopLock = () -> { + }; + + public NoOpLock() { } @Override - public BootstrapStateHandler.Lock lock() - throws InterruptedException { - return this; + public UncheckedAutoCloseable acquireReadLock() { + return noopLock; } @Override - public void unlock() { + public UncheckedAutoCloseable acquireWriteLock() { + return noopLock; } } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index 44cbc45ad6b5..9de00948da79 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -85,6 +85,7 @@ import org.apache.ozone.compaction.log.CompactionFileInfo; import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.rocksdb.util.RdbUtil; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.rocksdb.AbstractEventListener; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactionJobInfo; @@ -1104,7 +1105,7 @@ public void pruneOlderSnapshotsWithCompactionHistory() { sstFileNodesRemoved); } - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) { removeSstFiles(sstFileNodesRemoved); removeKeyFromCompactionLogTable(keysToRemove); } catch (InterruptedException e) { @@ -1266,7 +1267,7 @@ public void pruneSstFiles() { nonLeafSstFiles); } - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) { removeSstFiles(nonLeafSstFiles); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -1325,7 +1326,7 @@ public void pruneSstFileValues() { prunedSSTFilePath.toFile().getAbsolutePath()); // Move pruned.sst.tmp => file.sst and replace existing file atomically. - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) { Files.move(prunedSSTFilePath, sstFilePath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java index 025aabd1cb22..f9733eec24e2 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java @@ -65,9 +65,9 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -699,8 +699,8 @@ private void waitForLock(RocksDBCheckpointDiffer differ, Future future; // Take the lock and start the consumer. - try (BootstrapStateHandler.Lock lock = - differ.getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = + differ.getBootstrapStateLock().acquireWriteLock()) { future = executorService.submit( () -> { c.accept(differ); diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java index c59f6aeb491f..d24790dfcfc8 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java @@ -101,13 +101,13 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileWriter; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.util.Time; import org.apache.ozone.compaction.log.CompactionFileInfo; import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.rocksdb.util.RdbUtil; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -1223,8 +1223,8 @@ private void waitForLock(RocksDBCheckpointDiffer differ, Future future; // Take the lock and start the consumer. - try (BootstrapStateHandler.Lock lock = - differ.getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = + differ.getBootstrapStateLock().acquireWriteLock()) { future = executorService.submit( () -> { c.accept(differ); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java index 3715d4ede368..31d00b7228ae 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java @@ -171,7 +171,7 @@ public void write(int b) throws IOException { }); when(scmDbCheckpointServletMock.getBootstrapStateLock()).thenReturn( - new DBCheckpointServlet.Lock()); + new DBCheckpointServlet.NoOpLock()); scmDbCheckpointServletMock.init(); long initialCheckpointCount = scmMetrics.getDBCheckpointMetrics().getNumCheckpoints(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index d0b38116d5fa..df15d50e1506 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -114,6 +114,7 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.test.GenericTestUtils; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -856,8 +857,7 @@ private void testBootstrapLocking() throws Exception { // Confirm the other handlers are locked out when the bootstrap // servlet takes the lock. - try (BootstrapStateHandler.Lock ignoredLock = - spyServlet.getBootstrapStateLock().lock()) { + try (AutoCloseable ignoredLock = spyServlet.getBootstrapStateLock().acquireWriteLock()) { confirmServletLocksOutOtherHandler(keyDeletingService, executorService); confirmServletLocksOutOtherHandler(snapshotDeletingService, executorService); @@ -898,8 +898,7 @@ private void confirmServletLocksOutOtherHandler(BootstrapStateHandler handler, private void confirmOtherHandlerLocksOutServlet(BootstrapStateHandler handler, BootstrapStateHandler servlet, ExecutorService executorService) throws InterruptedException { - try (BootstrapStateHandler.Lock ignoredLock = - handler.getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable ignoredLock = handler.getBootstrapStateLock().acquireWriteLock()) { Future test = checkLock(servlet, executorService); // Servlet should fail to lock when other handler has taken it. assertThrows(TimeoutException.class, @@ -912,8 +911,7 @@ private Future checkLock(BootstrapStateHandler handler, ExecutorService executorService) { return executorService.submit(() -> { try { - handler.getBootstrapStateLock().lock(); - handler.getBootstrapStateLock().unlock(); + handler.getBootstrapStateLock().acquireWriteLock().close(); return true; } catch (InterruptedException e) { } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index b936d7ab5180..7c8f2eb8db4d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -114,6 +114,7 @@ import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -637,10 +638,25 @@ public void testBootstrapLockCoordination() throws Exception { RocksDBCheckpointDiffer mockCheckpointDiffer = mock(RocksDBCheckpointDiffer.class); // Create mock locks for each service BootstrapStateHandler.Lock mockDeletingLock = mock(BootstrapStateHandler.Lock.class); + UncheckedAutoCloseable mockDeletingAcquiredLock = mock(UncheckedAutoCloseable.class); + when(mockDeletingLock.acquireWriteLock()).thenReturn(mockDeletingAcquiredLock); + BootstrapStateHandler.Lock mockDirDeletingLock = mock(BootstrapStateHandler.Lock.class); + UncheckedAutoCloseable mockDirDeletingAcquiredLock = mock(UncheckedAutoCloseable.class); + when(mockDirDeletingLock.acquireWriteLock()).thenReturn(mockDirDeletingAcquiredLock); + BootstrapStateHandler.Lock mockFilteringLock = mock(BootstrapStateHandler.Lock.class); + UncheckedAutoCloseable mockFilteringAcquiredLock = mock(UncheckedAutoCloseable.class); + when(mockFilteringLock.acquireWriteLock()).thenReturn(mockFilteringAcquiredLock); + BootstrapStateHandler.Lock mockSnapshotDeletingLock = mock(BootstrapStateHandler.Lock.class); + UncheckedAutoCloseable mockSnapshotDeletingAcquiredLock = mock(UncheckedAutoCloseable.class); + when(mockSnapshotDeletingLock.acquireWriteLock()).thenReturn(mockSnapshotDeletingAcquiredLock); + BootstrapStateHandler.Lock mockCheckpointDifferLock = mock(BootstrapStateHandler.Lock.class); + UncheckedAutoCloseable mockCheckpointDifferAcquiredLock = mock(UncheckedAutoCloseable.class); + when(mockCheckpointDifferLock.acquireWriteLock()).thenReturn(mockCheckpointDifferAcquiredLock); + // Configure service mocks to return their respective locks when(mockDeletingService.getBootstrapStateLock()).thenReturn(mockDeletingLock); when(mockDirDeletingService.getBootstrapStateLock()).thenReturn(mockDirDeletingLock); @@ -665,25 +681,23 @@ public void testBootstrapLockCoordination() throws Exception { // Create the actual Lock instance (this tests the real implementation) OMDBCheckpointServlet.Lock bootstrapLock = new OMDBCheckpointServlet.Lock(mockOM); // Test successful lock acquisition - BootstrapStateHandler.Lock result = bootstrapLock.lock(); + UncheckedAutoCloseable result = bootstrapLock.acquireWriteLock(); // Verify all service locks were acquired - verify(mockDeletingLock).lock(); - verify(mockDirDeletingLock).lock(); - verify(mockFilteringLock).lock(); - verify(mockSnapshotDeletingLock).lock(); - verify(mockCheckpointDifferLock).lock(); + verify(mockDeletingLock).acquireWriteLock(); + verify(mockDirDeletingLock).acquireWriteLock(); + verify(mockFilteringLock).acquireWriteLock(); + verify(mockSnapshotDeletingLock).acquireWriteLock(); + verify(mockCheckpointDifferLock).acquireWriteLock(); // Verify double buffer flush was called verify(mockOM).awaitDoubleBufferFlush(); - // Verify the lock returns itself - assertEquals(bootstrapLock, result); // Test unlock - bootstrapLock.unlock(); + result.close(); // Verify all service locks were released - verify(mockDeletingLock).unlock(); - verify(mockDirDeletingLock).unlock(); - verify(mockFilteringLock).unlock(); - verify(mockSnapshotDeletingLock).unlock(); - verify(mockCheckpointDifferLock).unlock(); + verify(mockDeletingAcquiredLock).close(); + verify(mockDirDeletingAcquiredLock).close(); + verify(mockFilteringAcquiredLock).close(); + verify(mockSnapshotDeletingAcquiredLock).close(); + verify(mockCheckpointDifferAcquiredLock).close(); } /** @@ -709,13 +723,11 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception { AtomicInteger servicesSucceeded = new AtomicInteger(0); // Checkpoint thread holds bootstrap lock Thread checkpointThread = new Thread(() -> { - try { - LOG.info("Acquiring bootstrap lock for checkpoint..."); - BootstrapStateHandler.Lock acquired = bootstrapLock.lock(); + LOG.info("Acquiring bootstrap lock for checkpoint..."); + try (UncheckedAutoCloseable acquired = bootstrapLock.acquireWriteLock()) { bootstrapAcquired.countDown(); Thread.sleep(3000); // Hold for 3 seconds LOG.info("Releasing bootstrap lock..."); - acquired.unlock(); } catch (Exception e) { fail("Checkpoint failed: " + e.getMessage()); } @@ -729,11 +741,12 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception { LOG.info("{} : Trying to acquire lock...", serviceName); servicesBlocked.incrementAndGet(); BootstrapStateHandler.Lock serviceLock = service.getBootstrapStateLock(); - serviceLock.lock(); // Should block! - servicesBlocked.decrementAndGet(); - servicesSucceeded.incrementAndGet(); - LOG.info(" {} : Lock acquired!", serviceName); - serviceLock.unlock(); + try (UncheckedAutoCloseable lock = serviceLock.acquireReadLock()) { + // Should block! + servicesBlocked.decrementAndGet(); + servicesSucceeded.incrementAndGet(); + LOG.info(" {} : Lock acquired!", serviceName); + } } allServicesCompleted.countDown(); } catch (Exception e) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java index eabafc45176c..635002aa0986 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java @@ -191,7 +191,6 @@ public void shutdown() { } @Test - @Flaky("HDDS-13889") @DisplayName("testSnapshotAndKeyDeletionBackgroundServices") @SuppressWarnings("methodlength") public void testSnapshotAndKeyDeletionBackgroundServices() diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index efe9fc0aeea9..ebc77f5da7d1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -40,6 +40,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -74,6 +75,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -684,21 +686,21 @@ static class Lock extends BootstrapStateHandler.Lock { } @Override - public BootstrapStateHandler.Lock lock() - throws InterruptedException { + public UncheckedAutoCloseable acquireWriteLock() throws InterruptedException { // First lock all the handlers. + List acquiredLocks = new ArrayList<>(locks.size()); for (BootstrapStateHandler.Lock lock : locks) { - lock.lock(); + acquiredLocks.add(lock.acquireWriteLock()); } // Then wait for the double buffer to be flushed. om.awaitDoubleBufferFlush(); - return this; + return () -> acquiredLocks.forEach(UncheckedAutoCloseable::close); } @Override - public void unlock() { - locks.forEach(BootstrapStateHandler.Lock::unlock); + public UncheckedAutoCloseable acquireReadLock() { + throw new UnsupportedOperationException("Read locks are not supported for OMDBCheckpointServlet"); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index 3291c37a0b8c..4eb49db24aa3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.util.Time; import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,7 +149,7 @@ public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServl OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST); Set receivedSstFiles = extractFilesToExclude(sstParam); Path tmpdir = null; - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireWriteLock()) { tmpdir = Files.createTempDirectory(getBootstrapTempData().toPath(), "bootstrap-data-"); if (tmpdir == null) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java index 8ff0a71d68d8..e7c76bc539d6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,7 +186,7 @@ public BackgroundTaskResult call() throws Exception { ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(), snapshotInfo.getBucketName()); - try ( + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock(); UncheckedAutoCloseableSupplier snapshotMetadataReader = snapshotManager.get().getActiveSnapshot( snapshotInfo.getVolumeName(), @@ -195,10 +196,8 @@ public BackgroundTaskResult call() throws Exception { RDBStore rdbStore = (RDBStore) omSnapshot.getMetadataManager() .getStore(); RocksDatabase db = rdbStore.getDb(); - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock() - .lock()) { - db.deleteFilesNotMatchingPrefix(bucketPrefixInfo); - } + db.deleteFilesNotMatchingPrefix(bucketPrefixInfo); + markSSTFilteredFlagForSnapshot(snapshotInfo); snapshotLimit--; snapshotFilteredCount.getAndIncrement(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 5650ab7734e2..5a7fc28d7627 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.DeletingServiceMetrics; @@ -36,6 +39,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.util.UncheckedAutoCloseable; /** * Abstracts common code from KeyDeletingService and DirectoryDeletingService @@ -67,6 +71,9 @@ public AbstractKeyDeletingService(String serviceName, long interval, this.suspended = new AtomicBoolean(false); } + @Override + public abstract DeletingServiceTaskQueue getTasks(); + protected OMResponse submitRequest(OMRequest omRequest) throws ServiceException { return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet()); } @@ -92,6 +99,36 @@ boolean isPreviousPurgeTransactionFlushed() throws IOException { return true; } + /** + * A specialized implementation of {@link BackgroundTaskQueue} that modifies + * the behavior of added tasks to utilize a read lock during execution. + * + * This class ensures that every {@link BackgroundTask} added to the queue + * is wrapped such that its execution acquires a read lock via + * {@code getBootstrapStateLock().acquireReadLock()} before performing any + * operations. The lock is automatically released upon task completion or + * exception, ensuring safe concurrent execution of tasks within the service when running along with bootstrap flow. + */ + public class DeletingServiceTaskQueue extends BackgroundTaskQueue { + @Override + public synchronized void add(BackgroundTask task) { + super.add(new BackgroundTask() { + + @Override + public BackgroundTaskResult call() throws Exception { + try (UncheckedAutoCloseable readLock = getBootstrapStateLock().acquireReadLock()) { + return task.call(); + } + } + + @Override + public int getPriority() { + return task.getPriority(); + } + }); + } + } + /** * Suspend the service. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 001e686455f1..c78330b74425 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -53,14 +53,12 @@ import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.utils.BackgroundTask; -import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.Table.KeyValue; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.ClientVersion; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.DeleteKeysResult; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMConfigKeys; @@ -208,8 +206,8 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) { } @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); + public DeletingServiceTaskQueue getTasks() { + DeletingServiceTaskQueue queue = new DeletingServiceTaskQueue(); queue.add(new DirDeletingTask(null)); if (deepCleanSnapshots) { Iterator iterator = null; @@ -266,7 +264,7 @@ void optimizeDirDeletesAndSubmitRequest( CheckedFunction, Boolean, IOException> reclaimableDirChecker, CheckedFunction, Boolean, IOException> reclaimableFileChecker, Map bucketNameInfoMap, - UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException { + UUID expectedPreviousSnapshotId, long rnCnt) { // Optimization to handle delete sub-dir and keys to remove quickly // This case will be useful to handle when depth of directory is high @@ -461,8 +459,7 @@ private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest( } private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List requests, - String snapTableKey, UUID expectedPreviousSnapshotId, Map bucketNameInfoMap) - throws InterruptedException { + String snapTableKey, UUID expectedPreviousSnapshotId, Map bucketNameInfoMap) { OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest = OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder(); @@ -489,7 +486,7 @@ private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List> totalExclusiveSizeMap, long runCount) throws InterruptedException { + Map> totalExclusiveSizeMap, long runCount) { OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock(); String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 254184f8b824..75019adf7ec5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundTask; -import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; import org.apache.hadoop.hdds.utils.db.Table; @@ -387,7 +386,7 @@ private Pair, Boolean> submitPurgeKeysRequest( bucketPurgeKeysSizeMap.values().stream().map(BucketPurgeSize::toProtobuf) .forEach(requestBuilder::addBucketPurgeKeysSize); bucketPurgeKeysSizeMap.clear(); - purgeSuccess = submitPurgeRequest(snapTableKey, purgeSuccess, requestBuilder); + purgeSuccess = submitPurgeRequest(purgeSuccess, requestBuilder); requestBuilder = getPurgeKeysRequest(snapTableKey, expectedPreviousSnapshotId); currSize = baseSize; } @@ -418,19 +417,18 @@ private static PurgeKeysRequest.Builder getPurgeKeysRequest(String snapTableKey, return requestBuilder; } - private boolean submitPurgeRequest(String snapTableKey, boolean purgeSuccess, - PurgeKeysRequest.Builder requestBuilder) { + private boolean submitPurgeRequest(boolean purgeSuccess, PurgeKeysRequest.Builder requestBuilder) { OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) .setPurgeKeysRequest(requestBuilder.build()).setClientId(getClientId().toString()).build(); - try (Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { + try { OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); if (omResponse != null) { purgeSuccess = purgeSuccess && omResponse.getSuccess(); } - } catch (ServiceException | InterruptedException e) { + } catch (ServiceException e) { LOG.error("PurgeKey request failed in batch. Will retry at next run.", e); purgeSuccess = false; // Continue to next batch instead of returning immediately @@ -466,9 +464,9 @@ private void resetMetrics() { } @Override - public BackgroundTaskQueue getTasks() { + public DeletingServiceTaskQueue getTasks() { resetMetrics(); - BackgroundTaskQueue queue = new BackgroundTaskQueue(); + DeletingServiceTaskQueue queue = new DeletingServiceTaskQueue(); queue.add(new KeyDeletingTask(null)); if (deepCleanSnapshots) { Iterator iterator = null; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index ab40a0530fce..07f11cbe593a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -39,11 +39,9 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.BackgroundTask; -import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.ClientVersion; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -231,7 +229,7 @@ public BackgroundTaskResult call() throws InterruptedException { return BackgroundTaskResult.EmptyTaskResult.newResult(); } - private void submitSnapshotPurgeRequest(List purgeSnapshotKeys) throws InterruptedException { + private void submitSnapshotPurgeRequest(List purgeSnapshotKeys) { if (!purgeSnapshotKeys.isEmpty()) { SnapshotPurgeRequest snapshotPurgeRequest = SnapshotPurgeRequest .newBuilder() @@ -244,16 +242,14 @@ private void submitSnapshotPurgeRequest(List purgeSnapshotKeys) throws I .setClientId(clientId.toString()) .build(); - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { - submitOMRequest(omRequest); - } + submitOMRequest(omRequest); } } private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, List deletedKeys, List renamedList, - List dirsToMove) throws InterruptedException { + List dirsToMove) { SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = SnapshotMoveTableKeysRequest.newBuilder() .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId())); @@ -282,9 +278,7 @@ private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, .setSnapshotMoveTableKeysRequest(moveDeletedKeys) .setClientId(clientId.toString()) .build(); - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { - submitOMRequest(omRequest); - } + submitOMRequest(omRequest); } private void submitOMRequest(OMRequest omRequest) { @@ -313,8 +307,8 @@ boolean shouldIgnoreSnapshot(SnapshotInfo snapInfo) throws IOException { } @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); + public DeletingServiceTaskQueue getTasks() { + DeletingServiceTaskQueue queue = new DeletingServiceTaskQueue(); queue.add(new SnapshotDeletingTask()); return queue; } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 3dcddaeeafa9..4d136bac17dc 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -71,7 +71,6 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.server.ServerUtils; -import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.db.DBConfigFromFile; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.common.BlockGroup; @@ -658,7 +657,7 @@ public void testKeyDeletingServiceWithDeepCleanedSnapshots() throws Exception { KeyDeletingService kds = Mockito.spy(new KeyDeletingService(ozoneManager, scmBlockTestingClient, 10000, 100000, conf, 10, true)); when(kds.getTasks()).thenAnswer(i -> { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); + AbstractKeyDeletingService.DeletingServiceTaskQueue queue = kds.new DeletingServiceTaskQueue(); for (UUID id : snapshotIds) { queue.add(kds.new KeyDeletingTask(id)); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSstFilteringService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSstFilteringService.java index 108dd30c8222..b25ff5c52e47 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSstFilteringService.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.utils.db.DBProfile; import org.apache.hadoop.hdds.utils.db.RDBStore; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -67,6 +66,7 @@ import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -243,8 +243,7 @@ public void testIrrelevantSstFileDeletion() String snapshotName2 = "snapshot2"; final long count; - try (BootstrapStateHandler.Lock lock = - filteringService.getBootstrapStateLock().lock()) { + try (UncheckedAutoCloseable lock = filteringService.getBootstrapStateLock().acquireWriteLock()) { count = filteringService.getSnapshotFilteredCount().get(); createSnapshot(volumeName, bucketName2, snapshotName2);