diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index ec2080e9cf48..0f5c8bae4b46 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -62,12 +62,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.servlet.ServletConfig; import javax.servlet.ServletContext; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -96,8 +99,12 @@ import org.apache.hadoop.ozone.om.codec.OMDBDefinition; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; +import org.apache.hadoop.ozone.om.service.KeyDeletingService; +import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -110,6 +117,8 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Class used for testing the OM DB Checkpoint provider servlet using inode based transfer logic. @@ -128,6 +137,8 @@ public class TestOMDbCheckpointServletInodeBasedXfer { private ServletOutputStream servletOutputStream; private File tempFile; private static final AtomicInteger COUNTER = new AtomicInteger(); + private static final Logger LOG = + LoggerFactory.getLogger(TestOMDbCheckpointServletInodeBasedXfer.class); @BeforeEach void init() throws Exception { @@ -438,6 +449,143 @@ public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { } } + @Test + public void testBootstrapLockCoordination() throws Exception { + // Create mocks for all background services + KeyDeletingService mockDeletingService = mock(KeyDeletingService.class); + DirectoryDeletingService mockDirDeletingService = mock(DirectoryDeletingService.class); + SstFilteringService mockFilteringService = mock(SstFilteringService.class); + SnapshotDeletingService mockSnapshotDeletingService = mock(SnapshotDeletingService.class); + RocksDBCheckpointDiffer mockCheckpointDiffer = mock(RocksDBCheckpointDiffer.class); + // Create mock locks for each service + BootstrapStateHandler.Lock mockDeletingLock = mock(BootstrapStateHandler.Lock.class); + BootstrapStateHandler.Lock mockDirDeletingLock = mock(BootstrapStateHandler.Lock.class); + BootstrapStateHandler.Lock mockFilteringLock = mock(BootstrapStateHandler.Lock.class); + BootstrapStateHandler.Lock mockSnapshotDeletingLock = mock(BootstrapStateHandler.Lock.class); + BootstrapStateHandler.Lock mockCheckpointDifferLock = mock(BootstrapStateHandler.Lock.class); + // Configure service mocks to return their respective locks + when(mockDeletingService.getBootstrapStateLock()).thenReturn(mockDeletingLock); + when(mockDirDeletingService.getBootstrapStateLock()).thenReturn(mockDirDeletingLock); + when(mockFilteringService.getBootstrapStateLock()).thenReturn(mockFilteringLock); + when(mockSnapshotDeletingService.getBootstrapStateLock()).thenReturn(mockSnapshotDeletingLock); + when(mockCheckpointDiffer.getBootstrapStateLock()).thenReturn(mockCheckpointDifferLock); + // Mock KeyManager and its services + KeyManager mockKeyManager = mock(KeyManager.class); + when(mockKeyManager.getDeletingService()).thenReturn(mockDeletingService); + when(mockKeyManager.getDirDeletingService()).thenReturn(mockDirDeletingService); + when(mockKeyManager.getSnapshotSstFilteringService()).thenReturn(mockFilteringService); + when(mockKeyManager.getSnapshotDeletingService()).thenReturn(mockSnapshotDeletingService); + // Mock OMMetadataManager and Store + OMMetadataManager mockMetadataManager = mock(OMMetadataManager.class); + DBStore mockStore = mock(DBStore.class); + when(mockMetadataManager.getStore()).thenReturn(mockStore); + when(mockStore.getRocksDBCheckpointDiffer()).thenReturn(mockCheckpointDiffer); + // Mock OzoneManager + OzoneManager mockOM = mock(OzoneManager.class); + when(mockOM.getKeyManager()).thenReturn(mockKeyManager); + when(mockOM.getMetadataManager()).thenReturn(mockMetadataManager); + // Create the actual Lock instance (this tests the real implementation) + OMDBCheckpointServlet.Lock bootstrapLock = new OMDBCheckpointServlet.Lock(mockOM); + // Test successful lock acquisition + BootstrapStateHandler.Lock result = bootstrapLock.lock(); + // Verify all service locks were acquired + verify(mockDeletingLock).lock(); + verify(mockDirDeletingLock).lock(); + verify(mockFilteringLock).lock(); + verify(mockSnapshotDeletingLock).lock(); + verify(mockCheckpointDifferLock).lock(); + // Verify double buffer flush was called + verify(mockOM).awaitDoubleBufferFlush(); + // Verify the lock returns itself + assertEquals(bootstrapLock, result); + // Test unlock + bootstrapLock.unlock(); + // Verify all service locks were released + verify(mockDeletingLock).unlock(); + verify(mockDirDeletingLock).unlock(); + verify(mockFilteringLock).unlock(); + verify(mockSnapshotDeletingLock).unlock(); + verify(mockCheckpointDifferLock).unlock(); + } + + /** + * Verifies that bootstrap lock acquisition blocks background services during checkpoint creation, + * preventing race conditions between checkpoint and service operations. + */ + @Test + public void testBootstrapLockBlocksMultipleServices() throws Exception { + setupCluster(); + // Initialize servlet + OMDBCheckpointServletInodeBasedXfer servlet = new OMDBCheckpointServletInodeBasedXfer(); + ServletConfig servletConfig = mock(ServletConfig.class); + ServletContext servletContext = mock(ServletContext.class); + when(servletConfig.getServletContext()).thenReturn(servletContext); + when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)).thenReturn(om); + servlet.init(servletConfig); + + BootstrapStateHandler.Lock bootstrapLock = servlet.getBootstrapStateLock(); + // Test multiple services being blocked + CountDownLatch bootstrapAcquired = new CountDownLatch(1); + CountDownLatch allServicesCompleted = new CountDownLatch(3); // 3 background services + AtomicInteger servicesBlocked = new AtomicInteger(0); + AtomicInteger servicesSucceeded = new AtomicInteger(0); + // Checkpoint thread holds bootstrap lock + Thread checkpointThread = new Thread(() -> { + try { + LOG.info("Acquiring bootstrap lock for checkpoint..."); + BootstrapStateHandler.Lock acquired = bootstrapLock.lock(); + bootstrapAcquired.countDown(); + Thread.sleep(3000); // Hold for 3 seconds + LOG.info("Releasing bootstrap lock..."); + acquired.unlock(); + } catch (Exception e) { + fail("Checkpoint failed: " + e.getMessage()); + } + }); + + BiFunction createServiceThread = + (serviceName, service) -> new Thread(() -> { + try { + bootstrapAcquired.await(); + if (service != null) { + LOG.info("{} : Trying to acquire lock...", serviceName); + servicesBlocked.incrementAndGet(); + BootstrapStateHandler.Lock serviceLock = service.getBootstrapStateLock(); + serviceLock.lock(); // Should block! + servicesBlocked.decrementAndGet(); + servicesSucceeded.incrementAndGet(); + LOG.info(" {} : Lock acquired!", serviceName); + serviceLock.unlock(); + } + allServicesCompleted.countDown(); + } catch (Exception e) { + LOG.error("{} failed", serviceName, e); + allServicesCompleted.countDown(); + } + }); + // Start all threads + checkpointThread.start(); + Thread keyDeletingThread = createServiceThread.apply("KeyDeletingService", + om.getKeyManager().getDeletingService()); + Thread dirDeletingThread = createServiceThread.apply("DirectoryDeletingService", + om.getKeyManager().getDirDeletingService()); + Thread snapshotDeletingThread = createServiceThread.apply("SnapshotDeletingService", + om.getKeyManager().getSnapshotDeletingService()); + keyDeletingThread.start(); + dirDeletingThread.start(); + snapshotDeletingThread.start(); + // Wait a bit, then verify multiple services are blocked + Thread.sleep(1000); + int blockedCount = servicesBlocked.get(); + assertTrue(blockedCount > 0, "At least one service should be blocked"); + assertEquals(0, servicesSucceeded.get(), "No services should have succeeded yet"); + // Wait for completion + assertTrue(allServicesCompleted.await(10, TimeUnit.SECONDS)); + // Verify all services eventually succeeded + assertEquals(0, servicesBlocked.get(), "No services should be blocked anymore"); + assertTrue(servicesSucceeded.get() > 0, "Services should have succeeded after lock release"); + } + private static void deleteWalFiles(Path snapshotDbDir) throws IOException { try (Stream filesInTarball = Files.list(snapshotDbDir)) { List files = filesInTarball.filter(p -> p.toString().contains(".log")) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index 1acd9593c822..c67bb2cdbee6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -93,6 +93,7 @@ public class OMDBCheckpointServletInodeBasedXfer extends DBCheckpointServlet { protected static final Logger LOG = LoggerFactory.getLogger(OMDBCheckpointServletInodeBasedXfer.class); private static final long serialVersionUID = 1L; + private transient BootstrapStateHandler.Lock lock; @Override public void init() throws ServletException { @@ -123,6 +124,12 @@ public void init() throws ServletException { allowedUsers, allowedGroups, om.isSpnegoEnabled()); + lock = new OMDBCheckpointServlet.Lock(om); + } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; } @Override