Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
Expand Down Expand Up @@ -96,8 +99,12 @@
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -110,6 +117,8 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class used for testing the OM DB Checkpoint provider servlet using inode based transfer logic.
Expand All @@ -128,6 +137,8 @@ public class TestOMDbCheckpointServletInodeBasedXfer {
private ServletOutputStream servletOutputStream;
private File tempFile;
private static final AtomicInteger COUNTER = new AtomicInteger();
private static final Logger LOG =
LoggerFactory.getLogger(TestOMDbCheckpointServletInodeBasedXfer.class);

@BeforeEach
void init() throws Exception {
Expand Down Expand Up @@ -438,6 +449,143 @@ public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception {
}
}

@Test
public void testBootstrapLockCoordination() throws Exception {
// Create mocks for all background services
KeyDeletingService mockDeletingService = mock(KeyDeletingService.class);
DirectoryDeletingService mockDirDeletingService = mock(DirectoryDeletingService.class);
SstFilteringService mockFilteringService = mock(SstFilteringService.class);
SnapshotDeletingService mockSnapshotDeletingService = mock(SnapshotDeletingService.class);
RocksDBCheckpointDiffer mockCheckpointDiffer = mock(RocksDBCheckpointDiffer.class);
// Create mock locks for each service
BootstrapStateHandler.Lock mockDeletingLock = mock(BootstrapStateHandler.Lock.class);
BootstrapStateHandler.Lock mockDirDeletingLock = mock(BootstrapStateHandler.Lock.class);
BootstrapStateHandler.Lock mockFilteringLock = mock(BootstrapStateHandler.Lock.class);
BootstrapStateHandler.Lock mockSnapshotDeletingLock = mock(BootstrapStateHandler.Lock.class);
BootstrapStateHandler.Lock mockCheckpointDifferLock = mock(BootstrapStateHandler.Lock.class);
// Configure service mocks to return their respective locks
when(mockDeletingService.getBootstrapStateLock()).thenReturn(mockDeletingLock);
when(mockDirDeletingService.getBootstrapStateLock()).thenReturn(mockDirDeletingLock);
when(mockFilteringService.getBootstrapStateLock()).thenReturn(mockFilteringLock);
when(mockSnapshotDeletingService.getBootstrapStateLock()).thenReturn(mockSnapshotDeletingLock);
when(mockCheckpointDiffer.getBootstrapStateLock()).thenReturn(mockCheckpointDifferLock);
// Mock KeyManager and its services
KeyManager mockKeyManager = mock(KeyManager.class);
when(mockKeyManager.getDeletingService()).thenReturn(mockDeletingService);
when(mockKeyManager.getDirDeletingService()).thenReturn(mockDirDeletingService);
when(mockKeyManager.getSnapshotSstFilteringService()).thenReturn(mockFilteringService);
when(mockKeyManager.getSnapshotDeletingService()).thenReturn(mockSnapshotDeletingService);
// Mock OMMetadataManager and Store
OMMetadataManager mockMetadataManager = mock(OMMetadataManager.class);
DBStore mockStore = mock(DBStore.class);
when(mockMetadataManager.getStore()).thenReturn(mockStore);
when(mockStore.getRocksDBCheckpointDiffer()).thenReturn(mockCheckpointDiffer);
// Mock OzoneManager
OzoneManager mockOM = mock(OzoneManager.class);
when(mockOM.getKeyManager()).thenReturn(mockKeyManager);
when(mockOM.getMetadataManager()).thenReturn(mockMetadataManager);
// Create the actual Lock instance (this tests the real implementation)
OMDBCheckpointServlet.Lock bootstrapLock = new OMDBCheckpointServlet.Lock(mockOM);
// Test successful lock acquisition
BootstrapStateHandler.Lock result = bootstrapLock.lock();
// Verify all service locks were acquired
verify(mockDeletingLock).lock();
verify(mockDirDeletingLock).lock();
verify(mockFilteringLock).lock();
verify(mockSnapshotDeletingLock).lock();
verify(mockCheckpointDifferLock).lock();
// Verify double buffer flush was called
verify(mockOM).awaitDoubleBufferFlush();
// Verify the lock returns itself
assertEquals(bootstrapLock, result);
// Test unlock
bootstrapLock.unlock();
// Verify all service locks were released
verify(mockDeletingLock).unlock();
verify(mockDirDeletingLock).unlock();
verify(mockFilteringLock).unlock();
verify(mockSnapshotDeletingLock).unlock();
verify(mockCheckpointDifferLock).unlock();
}

/**
* Verifies that bootstrap lock acquisition blocks background services during checkpoint creation,
* preventing race conditions between checkpoint and service operations.
*/
@Test
public void testBootstrapLockBlocksMultipleServices() throws Exception {
setupCluster();
// Initialize servlet
OMDBCheckpointServletInodeBasedXfer servlet = new OMDBCheckpointServletInodeBasedXfer();
ServletConfig servletConfig = mock(ServletConfig.class);
ServletContext servletContext = mock(ServletContext.class);
when(servletConfig.getServletContext()).thenReturn(servletContext);
when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)).thenReturn(om);
servlet.init(servletConfig);

BootstrapStateHandler.Lock bootstrapLock = servlet.getBootstrapStateLock();
// Test multiple services being blocked
CountDownLatch bootstrapAcquired = new CountDownLatch(1);
CountDownLatch allServicesCompleted = new CountDownLatch(3); // 3 background services
AtomicInteger servicesBlocked = new AtomicInteger(0);
AtomicInteger servicesSucceeded = new AtomicInteger(0);
// Checkpoint thread holds bootstrap lock
Thread checkpointThread = new Thread(() -> {
try {
LOG.info("Acquiring bootstrap lock for checkpoint...");
BootstrapStateHandler.Lock acquired = bootstrapLock.lock();
bootstrapAcquired.countDown();
Thread.sleep(3000); // Hold for 3 seconds
LOG.info("Releasing bootstrap lock...");
acquired.unlock();
} catch (Exception e) {
fail("Checkpoint failed: " + e.getMessage());
}
});

BiFunction<String, BootstrapStateHandler, Thread> createServiceThread =
(serviceName, service) -> new Thread(() -> {
try {
bootstrapAcquired.await();
if (service != null) {
LOG.info("{} : Trying to acquire lock...", serviceName);
servicesBlocked.incrementAndGet();
BootstrapStateHandler.Lock serviceLock = service.getBootstrapStateLock();
serviceLock.lock(); // Should block!
servicesBlocked.decrementAndGet();
servicesSucceeded.incrementAndGet();
LOG.info(" {} : Lock acquired!", serviceName);
serviceLock.unlock();
}
allServicesCompleted.countDown();
} catch (Exception e) {
LOG.error("{} failed", serviceName, e);
allServicesCompleted.countDown();
}
});
// Start all threads
checkpointThread.start();
Thread keyDeletingThread = createServiceThread.apply("KeyDeletingService",
om.getKeyManager().getDeletingService());
Thread dirDeletingThread = createServiceThread.apply("DirectoryDeletingService",
om.getKeyManager().getDirDeletingService());
Thread snapshotDeletingThread = createServiceThread.apply("SnapshotDeletingService",
om.getKeyManager().getSnapshotDeletingService());
keyDeletingThread.start();
dirDeletingThread.start();
snapshotDeletingThread.start();
// Wait a bit, then verify multiple services are blocked
Thread.sleep(1000);
int blockedCount = servicesBlocked.get();
assertTrue(blockedCount > 0, "At least one service should be blocked");
assertEquals(0, servicesSucceeded.get(), "No services should have succeeded yet");
// Wait for completion
assertTrue(allServicesCompleted.await(10, TimeUnit.SECONDS));
// Verify all services eventually succeeded
assertEquals(0, servicesBlocked.get(), "No services should be blocked anymore");
assertTrue(servicesSucceeded.get() > 0, "Services should have succeeded after lock release");
}

private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
List<Path> files = filesInTarball.filter(p -> p.toString().contains(".log"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class OMDBCheckpointServletInodeBasedXfer extends DBCheckpointServlet {
protected static final Logger LOG =
LoggerFactory.getLogger(OMDBCheckpointServletInodeBasedXfer.class);
private static final long serialVersionUID = 1L;
private transient BootstrapStateHandler.Lock lock;

@Override
public void init() throws ServletException {
Expand Down Expand Up @@ -123,6 +124,12 @@ public void init() throws ServletException {
allowedUsers,
allowedGroups,
om.isSpnegoEnabled());
lock = new OMDBCheckpointServlet.Lock(om);
}

@Override
public BootstrapStateHandler.Lock getBootstrapStateLock() {
return lock;
}

@Override
Expand Down