From d0d66698a538d6492235cafade09b16ecd873680 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Wed, 8 Oct 2025 23:29:31 +0530 Subject: [PATCH 01/11] HDDS-13772. Snapshot Paths to be re read from om checkpoint db inside lock again. (cherry picked from commit 3650d00e8f49e668b936644df79d859664564d3f) --- ...stOMDbCheckpointServletInodeBasedXfer.java | 55 +++++++++++++++++++ .../OMDBCheckpointServletInodeBasedXfer.java | 8 ++- 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index ec2080e9cf48..7129084a413e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -30,13 +30,17 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; @@ -47,6 +51,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.Sets; import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -223,6 +228,9 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any()); doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir(); doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir(); + doCallRealMethod().when(omDbCheckpointServletMock) + .transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap()); + doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(any(), anyBoolean()); } @ParameterizedTest @@ -438,6 +446,53 @@ public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { } } + + /** + * SCENARIO: + * 1. Initially: S1->S2->S3 snapshots exist, snapshotPaths = {S1, S2, S3} + * 2. S3 gets purged (deleted from live OM metadata) + * 3. Checkpoint is created (locked point-in-time state without S3) + * 4. Problem: Old code uses stale snapshotPaths {S1, S2, S3} from step 1 + * 5. Solution: Re-read snapshotPaths from checkpoint = {S1, S2} (no S3) + * This test verifies that checkpoint metadata manager (post-fix) excludes + * purged snapshots, while live metadata manager (pre-fix) would include them. + */ + @Test + public void testSnapshotPathsReReadFromCheckpointAfterPurge() throws Exception { + OMDBCheckpointServletInodeBasedXfer servlet = new OMDBCheckpointServletInodeBasedXfer(); + // Create test snapshot paths + Path s1Path = Paths.get("snapshots").resolve("s1"); + Path s2Path = Paths.get("snapshots").resolve("s2"); + Path s3Path = Paths.get("snapshots").resolve("s3"); + // Mock live metadata manager - includes S3 (before purge) + OMMetadataManager liveMetadataManager = mock(OMMetadataManager.class); + Set liveSnapshotPaths = Sets.newHashSet(s1Path, s2Path, s3Path); + // Mock checkpoint metadata manager - S3 purged + OmMetadataManagerImpl checkpointMetadataManager = mock(OmMetadataManagerImpl.class); + Set checkpointSnapshotPaths = Sets.newHashSet(s1Path, s2Path); // No S3! + OMDBCheckpointServletInodeBasedXfer spy = spy(servlet); + // Mock getSnapshotDirs to return different results based on metadata manager type + doAnswer(invocation -> { + OMMetadataManager manager = invocation.getArgument(0); + if (manager == liveMetadataManager) { + return liveSnapshotPaths; // Live manager sees S3 + } else { + return checkpointSnapshotPaths; // Checkpoint manager doesn't see S3 + } + }).when(spy).getSnapshotDirs(any(OMMetadataManager.class)); + + Set liveResult = spy.getSnapshotDirs(liveMetadataManager); + assertEquals(3, liveResult.size()); + assertTrue(liveResult.contains(s3Path), "Live manager should see S3"); + Set checkpointResult = spy.getSnapshotDirs(checkpointMetadataManager); + assertEquals(2, checkpointResult.size()); + assertTrue(checkpointResult.contains(s1Path), "Checkpoint should include S1"); + assertTrue(checkpointResult.contains(s2Path), "Checkpoint should include S2"); + assertFalse(checkpointResult.contains(s3Path), "Checkpoint should NOT include S3 (purged)"); + assertNotEquals(liveResult.size(), checkpointResult.size(), + "Checkpoint and live snapshot paths should differ"); + } + private static void deleteWalFiles(Path snapshotDbDir) throws IOException { try (Stream filesInTarball = Files.list(snapshotDbDir)) { List files = filesInTarball.filter(p -> p.toString().contains(".log")) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index 1acd9593c822..e27ef5e0a4ab 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -250,6 +250,10 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina writeDBToArchive(sstFilesToExclude, checkpointDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); if (includeSnapshotData) { + // get the list of snapshots from the checkpoint + OmMetadataManagerImpl checkpointMetadataManager = OmMetadataManagerImpl.createCheckpointMetadataManager( + om.getConfiguration(), checkpoint); + snapshotPaths = getSnapshotDirs(checkpointMetadataManager); Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir, @@ -284,7 +288,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication. * @throws IOException if an I/O error occurs during processing. */ - private void transferSnapshotData(Set sstFilesToExclude, Path tmpdir, Set snapshotPaths, + void transferSnapshotData(Set sstFilesToExclude, Path tmpdir, Set snapshotPaths, AtomicLong maxTotalSstSize, ArchiveOutputStream archiveOutputStream, Map hardLinkFileMap) throws IOException { OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); @@ -476,7 +480,7 @@ private boolean writeDBToArchive(Set sstFilesToExclude, Path dbDir, Atom * @return The created database checkpoint. * @throws IOException If an error occurs during checkpoint creation or file copying. */ - private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException { + DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException { // make tmp directories to contain the copies Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); From 3e091469557c9ceccb606a293cae798051bccd78 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 10 Oct 2025 17:13:42 +0530 Subject: [PATCH 02/11] fix leaks --- .../om/OMDBCheckpointServletInodeBasedXfer.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index e27ef5e0a4ab..45472ee2c61f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -251,9 +251,16 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); if (includeSnapshotData) { // get the list of snapshots from the checkpoint - OmMetadataManagerImpl checkpointMetadataManager = OmMetadataManagerImpl.createCheckpointMetadataManager( - om.getConfiguration(), checkpoint); - snapshotPaths = getSnapshotDirs(checkpointMetadataManager); + OmMetadataManagerImpl checkpointMetadataManager = null; + try { + checkpointMetadataManager = + OmMetadataManagerImpl.createCheckpointMetadataManager(om.getConfiguration(), checkpoint); + snapshotPaths = getSnapshotDirs(checkpointMetadataManager); + } finally { + if (checkpointMetadataManager != null) { + checkpointMetadataManager.stop(); + } + } Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir, From 6d748e25035494c49d21be558cab2ba6109a9db7 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Tue, 21 Oct 2025 15:12:41 +0530 Subject: [PATCH 03/11] address comment --- ...stOMDbCheckpointServletInodeBasedXfer.java | 142 ++++++++++++------ .../OMDBCheckpointServletInodeBasedXfer.java | 27 +++- 2 files changed, 123 insertions(+), 46 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index 7129084a413e..458d77161958 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -30,7 +30,6 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -40,7 +39,6 @@ import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; @@ -51,7 +49,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Sets; import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -101,6 +98,7 @@ import org.apache.hadoop.ozone.om.codec.OMDBDefinition; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; @@ -231,6 +229,7 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock) .transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap()); doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(any(), anyBoolean()); + doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any()); } @ParameterizedTest @@ -446,51 +445,103 @@ public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { } } - /** - * SCENARIO: - * 1. Initially: S1->S2->S3 snapshots exist, snapshotPaths = {S1, S2, S3} - * 2. S3 gets purged (deleted from live OM metadata) - * 3. Checkpoint is created (locked point-in-time state without S3) - * 4. Problem: Old code uses stale snapshotPaths {S1, S2, S3} from step 1 - * 5. Solution: Re-read snapshotPaths from checkpoint = {S1, S2} (no S3) - * This test verifies that checkpoint metadata manager (post-fix) excludes - * purged snapshots, while live metadata manager (pre-fix) would include them. + * Tests the full checkpoint servlet flow to ensure snapshot paths are read + * from checkpoint metadata (frozen state) rather than live OM metadata (current state). + * Scenario: + * 1. Create snapshots S1, S2 + * 2. Checkpoint is created (freezes state with S1, S2) + * 3. S2 gets purged from live OM (after checkpoint creation) + * 4. Servlet processes checkpoint - should still include S2 data */ @Test - public void testSnapshotPathsReReadFromCheckpointAfterPurge() throws Exception { - OMDBCheckpointServletInodeBasedXfer servlet = new OMDBCheckpointServletInodeBasedXfer(); - // Create test snapshot paths - Path s1Path = Paths.get("snapshots").resolve("s1"); - Path s2Path = Paths.get("snapshots").resolve("s2"); - Path s3Path = Paths.get("snapshots").resolve("s3"); - // Mock live metadata manager - includes S3 (before purge) - OMMetadataManager liveMetadataManager = mock(OMMetadataManager.class); - Set liveSnapshotPaths = Sets.newHashSet(s1Path, s2Path, s3Path); - // Mock checkpoint metadata manager - S3 purged - OmMetadataManagerImpl checkpointMetadataManager = mock(OmMetadataManagerImpl.class); - Set checkpointSnapshotPaths = Sets.newHashSet(s1Path, s2Path); // No S3! - OMDBCheckpointServletInodeBasedXfer spy = spy(servlet); - // Mock getSnapshotDirs to return different results based on metadata manager type - doAnswer(invocation -> { - OMMetadataManager manager = invocation.getArgument(0); - if (manager == liveMetadataManager) { - return liveSnapshotPaths; // Live manager sees S3 - } else { - return checkpointSnapshotPaths; // Checkpoint manager doesn't see S3 - } - }).when(spy).getSnapshotDirs(any(OMMetadataManager.class)); - - Set liveResult = spy.getSnapshotDirs(liveMetadataManager); - assertEquals(3, liveResult.size()); - assertTrue(liveResult.contains(s3Path), "Live manager should see S3"); - Set checkpointResult = spy.getSnapshotDirs(checkpointMetadataManager); - assertEquals(2, checkpointResult.size()); - assertTrue(checkpointResult.contains(s1Path), "Checkpoint should include S1"); - assertTrue(checkpointResult.contains(s2Path), "Checkpoint should include S2"); - assertFalse(checkpointResult.contains(s3Path), "Checkpoint should NOT include S3 (purged)"); - assertNotEquals(liveResult.size(), checkpointResult.size(), - "Checkpoint and live snapshot paths should differ"); + public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { + String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5); + String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5); + + setupCluster(); + om.getKeyManager().getSnapshotSstFilteringService().pause(); + + // Create test data and snapshots + OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client, volumeName, bucketName); + + // Create key before first snapshot + TestDataUtil.createKey(bucket, "key1", + ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), + "data1".getBytes(StandardCharsets.UTF_8)); + client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot1"); + + // Create key before second snapshot + TestDataUtil.createKey(bucket, "key2", + ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), + "data2".getBytes(StandardCharsets.UTF_8)); + client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2"); + om.getMetadataManager().getStore().flushDB(); + + // At this point: Live OM has snapshots S1, S2 + List snapshotsBeforePurge = new ArrayList<>(); + client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) + .forEachRemaining(snapshotsBeforePurge::add); + assertEquals(2, snapshotsBeforePurge.size(), "Should have 2 snapshots initially"); + OzoneSnapshot snapshot2 = snapshotsBeforePurge.stream() + .filter(snap -> snap.getName().equals("snapshot2")) + .findFirst() + .orElseThrow(() -> new RuntimeException("snapshot2 not found")); + + // Setup servlet mocks for checkpoint processing + setupMocks(); + when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true"); + + // Create a checkpoint that captures current state (S1, S2) + DBStore dbStore = om.getMetadataManager().getStore(); + DBStore spyDbStore = spy(dbStore); + AtomicReference capturedCheckpoint = new AtomicReference<>(); + + when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> { + DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); + doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for verification + capturedCheckpoint.set(checkpoint); + return checkpoint; + }); + + // Initialize servlet + doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(), + eq(false), any(), any(), eq(false)); + omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), + false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false); + when(responseMock.getOutputStream()).thenReturn(servletOutputStream); + // Process checkpoint servlet - this is where old vs new code differs + omDbCheckpointServletMock.doGet(requestMock, responseMock); + // Purge snapshot2 from live OM AFTER checkpoint creation but BEFORE servlet processing + String snapshot2TableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshot2.getName()); + // Simulate the purge operation that removes from snapshotInfoTable + om.getMetadataManager().getSnapshotInfoTable().delete(snapshot2TableKey); + om.getMetadataManager().getStore().flushDB(); + + // Verify live OM now only sees 1 snapshot + List snapshotsAfterPurge = new ArrayList<>(); + // simulating a purge here by only adding active snapshots + client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) + .forEachRemaining(snapshotsAfterPurge::add); + assertEquals(1, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge"); + // Extract tarball and verify contents + String testDirName = folder.resolve("testDir").toString(); + String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; + File newDbDir = new File(newDbDirName); + assertTrue(newDbDir.mkdirs()); + FileUtil.unTar(tempFile, newDbDir); + OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true); + Path snapshot2DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, + OM_DB_NAME + "-" + snapshot2.getSnapshotId()); + boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir); + // The critical assertion: checkpoint should include snapshot2 data + // even though it was purged from live OM after checkpoint creation + assertTrue(snapshot2IncludedInCheckpoint, + "Checkpoint should include snapshot2 data even though it was purged from live OM."); + // Cleanup + if (capturedCheckpoint.get() != null) { + capturedCheckpoint.get().cleanupCheckpoint(); + } } private static void deleteWalFiles(Path snapshotDbDir) throws IOException { @@ -555,6 +606,7 @@ private void setupClusterAndMocks(String volumeName, String bucketName, // Init the mock with the spyDbstore doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(), eq(false), any(), any(), eq(false)); + doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any()); omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index 45472ee2c61f..44dc8e067bbb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hdds.recon.ReconConfig; import org.apache.hadoop.hdds.utils.DBCheckpointServlet; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; @@ -255,7 +257,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina try { checkpointMetadataManager = OmMetadataManagerImpl.createCheckpointMetadataManager(om.getConfiguration(), checkpoint); - snapshotPaths = getSnapshotDirs(checkpointMetadataManager); + snapshotPaths = getSnapshotDirsFromDB(checkpointMetadataManager); } finally { if (checkpointMetadataManager != null) { checkpointMetadataManager.stop(); @@ -406,6 +408,29 @@ Set getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio return snapshotPaths; } + /** + * Collects paths to all snapshot databases from the OM DB. + * + * @param omMetadataManager OMMetadataManager instance + * @return Set of paths to snapshot databases + * @throws IOException if an I/O error occurs + */ + Set getSnapshotDirsFromDB(OMMetadataManager omMetadataManager) throws IOException { + Set snapshotPaths = new HashSet<>(); + try (TableIterator> iter = + omMetadataManager.getSnapshotInfoTable().iterator()) { + while (iter.hasNext()) { + Table.KeyValue kv = iter.next(); + SnapshotInfo snapshotInfo = kv.getValue(); + String snapshotDir = OmSnapshotManager.getSnapshotPath(getConf(), + snapshotInfo.getCheckpointDirName()); + Path path = Paths.get(snapshotDir); + snapshotPaths.add(path); + } + } + return snapshotPaths; + } + /** * Writes database files to the archive, handling deduplication based on inode IDs. * Here the dbDir could either be a snapshot db directory, the active om.db, From 1aa7867c5de404fe844221ff8cdf4bb3760d68e1 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 24 Oct 2025 00:23:07 +0530 Subject: [PATCH 04/11] fix compile --- .../ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java | 2 +- .../hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index 625588008916..2fe9378970da 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -239,7 +239,7 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir(); doCallRealMethod().when(omDbCheckpointServletMock) .transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap()); - doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(any(), anyBoolean()); + doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(anyBoolean()); doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any()); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index d31a29d38665..ba34d8c38cfe 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -518,7 +518,7 @@ private boolean writeDBToArchive(Set sstFilesToExclude, Stream fil * @param flush If true, flushes in-memory data to disk before checkpointing. * @throws IOException If an error occurs during checkpoint creation or file copying. */ - private DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws IOException { + DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws IOException { // Create & return the checkpoint. return getDbStore().getCheckpoint(flush); } From ca862a8105b022cac1ca574ef23c8f6aaf12f70f Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 24 Oct 2025 00:33:18 +0530 Subject: [PATCH 05/11] fix checkstyle --- .../hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index ba34d8c38cfe..5e48ced2328b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -68,7 +68,6 @@ import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; From d909182847a95e6adbe230d7be6658f3b4cceab3 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 27 Oct 2025 15:31:10 +0530 Subject: [PATCH 06/11] use try with resources --- .../ozone/om/OMDBCheckpointServletInodeBasedXfer.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index 5e48ced2328b..bb7546b2ae90 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -265,15 +265,9 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); if (includeSnapshotData) { // get the list of snapshots from the checkpoint - OmMetadataManagerImpl checkpointMetadataManager = null; - try { - checkpointMetadataManager = - OmMetadataManagerImpl.createCheckpointMetadataManager(om.getConfiguration(), checkpoint); + try (OmMetadataManagerImpl checkpointMetadataManager = OmMetadataManagerImpl + .createCheckpointMetadataManager(om.getConfiguration(), checkpoint)) { snapshotPaths = getSnapshotDirsFromDB(checkpointMetadataManager); - } finally { - if (checkpointMetadataManager != null) { - checkpointMetadataManager.stop(); - } } writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); From 7f14170d3f979fa89d793841c4c4a769d2279bad Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Mon, 27 Oct 2025 17:29:46 +0530 Subject: [PATCH 07/11] address comments --- ...stOMDbCheckpointServletInodeBasedXfer.java | 31 +++++++++++++------ .../OMDBCheckpointServletInodeBasedXfer.java | 21 +------------ 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index 2fe9378970da..46252c82af39 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -231,7 +231,6 @@ public void write(int b) throws IOException { .thenReturn(lock); doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getBootstrapTempData()); - doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirs(any()); doCallRealMethod().when(omDbCheckpointServletMock). processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any()); @@ -598,9 +597,10 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception { * from checkpoint metadata (frozen state) rather than live OM metadata (current state). * Scenario: * 1. Create snapshots S1, S2 - * 2. Checkpoint is created (freezes state with S1, S2) - * 3. S2 gets purged from live OM (after checkpoint creation) - * 4. Servlet processes checkpoint - should still include S2 data + * 2. Create Snapshot S3 + * 3. Checkpoint is created (freezes state with S1, S2) + * 4. S2 gets purged from live OM (after checkpoint creation) + * 5. Servlet processes checkpoint - should still include S2 data */ @Test public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { @@ -624,17 +624,25 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), "data2".getBytes(StandardCharsets.UTF_8)); client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2"); + TestDataUtil.createKey(bucket, "key3", + ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), + "data3".getBytes(StandardCharsets.UTF_8)); + client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3"); om.getMetadataManager().getStore().flushDB(); // At this point: Live OM has snapshots S1, S2 List snapshotsBeforePurge = new ArrayList<>(); client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshotsBeforePurge::add); - assertEquals(2, snapshotsBeforePurge.size(), "Should have 2 snapshots initially"); + assertEquals(3, snapshotsBeforePurge.size(), "Should have 3 snapshots initially"); OzoneSnapshot snapshot2 = snapshotsBeforePurge.stream() .filter(snap -> snap.getName().equals("snapshot2")) .findFirst() .orElseThrow(() -> new RuntimeException("snapshot2 not found")); + OzoneSnapshot snapshot3 = snapshotsBeforePurge.stream() + .filter(snap -> snap.getName().equals("snapshot3")) + .findFirst() + .orElseThrow(() -> new RuntimeException("snapshot2 not found")); // Setup servlet mocks for checkpoint processing setupMocks(); @@ -666,12 +674,12 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { om.getMetadataManager().getSnapshotInfoTable().delete(snapshot2TableKey); om.getMetadataManager().getStore().flushDB(); - // Verify live OM now only sees 1 snapshot + // Verify live OM now only sees 2 snapshots List snapshotsAfterPurge = new ArrayList<>(); // simulating a purge here by only adding active snapshots client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshotsAfterPurge::add); - assertEquals(1, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge"); + assertEquals(2, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge"); // Extract tarball and verify contents String testDirName = folder.resolve("testDir").toString(); String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; @@ -681,11 +689,16 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true); Path snapshot2DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, OM_DB_NAME + "-" + snapshot2.getSnapshotId()); + Path snapshot3DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, + OM_DB_NAME + "-" + snapshot3.getSnapshotId()); boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir); - // The critical assertion: checkpoint should include snapshot2 data - // even though it was purged from live OM after checkpoint creation + boolean snapshot3IncludedInCheckpoint = Files.exists(snapshot3DbDir); + // The critical assertion: checkpoint should include snapshot2 and snapshot3 data + // even though snapshot2 was purged from live OM after checkpoint creation assertTrue(snapshot2IncludedInCheckpoint, "Checkpoint should include snapshot2 data even though it was purged from live OM."); + assertTrue(snapshot3IncludedInCheckpoint, + "Checkpoint should include snapshot3 data"); // Cleanup if (capturedCheckpoint.get() != null) { capturedCheckpoint.get().cleanupCheckpoint(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index bb7546b2ae90..0c120ba080d6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -220,7 +220,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina if (!includeSnapshotData) { maxTotalSstSize.set(Long.MAX_VALUE); } else { - snapshotPaths = getSnapshotDirs(omMetadataManager); + snapshotPaths = getSnapshotDirsFromDB(omMetadataManager); } if (sstFilesToExclude.isEmpty()) { @@ -385,25 +385,6 @@ private OzoneConfiguration getConf() { .getConfiguration(); } - /** - * Collects paths to all snapshot databases. - * - * @param omMetadataManager OMMetadataManager instance - * @return Set of paths to snapshot databases - * @throws IOException if an I/O error occurs - */ - Set getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOException { - Set snapshotPaths = new HashSet<>(); - SnapshotChainManager snapshotChainManager = new SnapshotChainManager(omMetadataManager); - for (SnapshotChainInfo snapInfo : snapshotChainManager.getGlobalSnapshotChain().values()) { - String snapshotDir = - OmSnapshotManager.getSnapshotPath(getConf(), SnapshotInfo.getCheckpointDirName(snapInfo.getSnapshotId())); - Path path = Paths.get(snapshotDir); - snapshotPaths.add(path); - } - return snapshotPaths; - } - /** * Collects paths to all snapshot databases from the OM DB. * From a3c74f67f2a5c741432d2a66aa937d9f8140fc51 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 31 Oct 2025 18:00:34 +0530 Subject: [PATCH 08/11] address comment --- ...stOMDbCheckpointServletInodeBasedXfer.java | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index 46252c82af39..d86bb627c9bd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; @@ -66,6 +67,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -108,6 +110,7 @@ import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -149,6 +152,7 @@ void init() throws Exception { // ensure cache entries are not evicted thereby snapshot db's are not closed conf.setTimeDuration(OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL, 100, TimeUnit.MINUTES); + conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); } @AfterEach @@ -596,11 +600,12 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception { * Tests the full checkpoint servlet flow to ensure snapshot paths are read * from checkpoint metadata (frozen state) rather than live OM metadata (current state). * Scenario: - * 1. Create snapshots S1, S2 - * 2. Create Snapshot S3 - * 3. Checkpoint is created (freezes state with S1, S2) - * 4. S2 gets purged from live OM (after checkpoint creation) - * 5. Servlet processes checkpoint - should still include S2 data + * 1. Create snapshots S1, S2, S3 + * 2. Purge S2 + * 3. Checkpoint is created + * 4. S3 gets purged from live OM (after checkpoint creation) + * 5. Servlet processes checkpoint - should still include S1, S3 data as + * checkpoint snapshotInfoTable has S1 S3 */ @Test public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { @@ -630,7 +635,7 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3"); om.getMetadataManager().getStore().flushDB(); - // At this point: Live OM has snapshots S1, S2 + // At this point: Live OM has snapshots S1, S2 , S3 List snapshotsBeforePurge = new ArrayList<>(); client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshotsBeforePurge::add); @@ -644,11 +649,14 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { .findFirst() .orElseThrow(() -> new RuntimeException("snapshot2 not found")); + // purge snapshot S2 + purgeSnapshot(volumeName, bucketName, snapshot2); + // Setup servlet mocks for checkpoint processing setupMocks(); when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true"); - // Create a checkpoint that captures current state (S1, S2) + // Create a checkpoint that captures current state (S1, S3) DBStore dbStore = om.getMetadataManager().getStore(); DBStore spyDbStore = spy(dbStore); AtomicReference capturedCheckpoint = new AtomicReference<>(); @@ -668,18 +676,14 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { when(responseMock.getOutputStream()).thenReturn(servletOutputStream); // Process checkpoint servlet - this is where old vs new code differs omDbCheckpointServletMock.doGet(requestMock, responseMock); - // Purge snapshot2 from live OM AFTER checkpoint creation but BEFORE servlet processing - String snapshot2TableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshot2.getName()); - // Simulate the purge operation that removes from snapshotInfoTable - om.getMetadataManager().getSnapshotInfoTable().delete(snapshot2TableKey); - om.getMetadataManager().getStore().flushDB(); - - // Verify live OM now only sees 2 snapshots + // Purge snapshot3 + purgeSnapshot(volumeName, bucketName, snapshot3); + // Verify live OM now only sees 1 snapshot List snapshotsAfterPurge = new ArrayList<>(); // simulating a purge here by only adding active snapshots client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshotsAfterPurge::add); - assertEquals(2, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge"); + assertEquals(1, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge"); // Extract tarball and verify contents String testDirName = folder.resolve("testDir").toString(); String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; @@ -693,18 +697,31 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { OM_DB_NAME + "-" + snapshot3.getSnapshotId()); boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir); boolean snapshot3IncludedInCheckpoint = Files.exists(snapshot3DbDir); - // The critical assertion: checkpoint should include snapshot2 and snapshot3 data - // even though snapshot2 was purged from live OM after checkpoint creation - assertTrue(snapshot2IncludedInCheckpoint, - "Checkpoint should include snapshot2 data even though it was purged from live OM."); + assertFalse(snapshot2IncludedInCheckpoint, + "Checkpoint should not include snapshot2 as it was purged from live OM and captured in checkpoint."); assertTrue(snapshot3IncludedInCheckpoint, - "Checkpoint should include snapshot3 data"); + "Checkpoint should include snapshot3 data even though purged as checkpoint has not captured the purge"); // Cleanup if (capturedCheckpoint.get() != null) { capturedCheckpoint.get().cleanupCheckpoint(); } } + private void purgeSnapshot(String volumeName, String bucketName, OzoneSnapshot snapshot) + throws IOException, InterruptedException, TimeoutException { + String snapshotTableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshot.getName()); + // delete snapshot and wait for snapshot to be purged + client.getObjectStore().deleteSnapshot(volumeName, bucketName, snapshot.getName()); + GenericTestUtils.waitFor(() -> { + try { + return om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey) == null; + } catch (Exception ex) { + LOG.error("Exception while querying snapshot info for key {}", snapshotTableKey, ex); + return false; + } + }, 100, 20_000); + } + private static void deleteWalFiles(Path snapshotDbDir) throws IOException { try (Stream filesInTarball = Files.list(snapshotDbDir)) { List files = filesInTarball.filter(p -> p.toString().contains(".log")) From 30c5a89a749e610b45a4d5456de3f8254e42cb27 Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Fri, 31 Oct 2025 20:54:59 +0530 Subject: [PATCH 09/11] address comment --- ...stOMDbCheckpointServletInodeBasedXfer.java | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index d86bb627c9bd..9da1a8826a78 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -90,6 +90,7 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.Archiver; +import org.apache.hadoop.hdds.utils.DBCheckpointServlet; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; @@ -169,7 +170,7 @@ private void setupCluster() throws Exception { om = cluster.getOzoneManager(); } - private void setupMocks() throws Exception { + private void setupMocks(boolean useNoOpBootstrapLock) throws Exception { final Path tempPath = folder.resolve("temp" + COUNTER.incrementAndGet() + ".tar"); tempFile = tempPath.toFile(); @@ -244,6 +245,12 @@ public void write(int b) throws IOException { .transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap()); doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(anyBoolean()); doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any()); + if (useNoOpBootstrapLock) { + // Override the lock to be a no-op so purgeSnapshot can work inside the callback + BootstrapStateHandler.Lock noOpLock = new DBCheckpointServlet.Lock(); + when(omDbCheckpointServletMock.getBootstrapStateLock()) + .thenReturn(noOpLock); + } } @ParameterizedTest @@ -407,7 +414,7 @@ public void testSnapshotDBConsistency() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { - setupMocks(); + setupMocks(false); Path dbDir = folder.resolve("db_data"); Files.createDirectories(dbDir); // Create dummy files: one SST, one non-SST @@ -632,28 +639,19 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { TestDataUtil.createKey(bucket, "key3", ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), "data3".getBytes(StandardCharsets.UTF_8)); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3"); - om.getMetadataManager().getStore().flushDB(); // At this point: Live OM has snapshots S1, S2 , S3 - List snapshotsBeforePurge = new ArrayList<>(); + List snapshots = new ArrayList<>(); client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) - .forEachRemaining(snapshotsBeforePurge::add); - assertEquals(3, snapshotsBeforePurge.size(), "Should have 3 snapshots initially"); - OzoneSnapshot snapshot2 = snapshotsBeforePurge.stream() + .forEachRemaining(snapshots::add); + assertEquals(2, snapshots.size(), "Should have 2 snapshots initially"); + OzoneSnapshot snapshot2 = snapshots.stream() .filter(snap -> snap.getName().equals("snapshot2")) .findFirst() .orElseThrow(() -> new RuntimeException("snapshot2 not found")); - OzoneSnapshot snapshot3 = snapshotsBeforePurge.stream() - .filter(snap -> snap.getName().equals("snapshot3")) - .findFirst() - .orElseThrow(() -> new RuntimeException("snapshot2 not found")); - - // purge snapshot S2 - purgeSnapshot(volumeName, bucketName, snapshot2); // Setup servlet mocks for checkpoint processing - setupMocks(); + setupMocks(true); when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true"); // Create a checkpoint that captures current state (S1, S3) @@ -662,6 +660,12 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { AtomicReference capturedCheckpoint = new AtomicReference<>(); when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> { + // Purge snapshot2 before checkpoint + purgeSnapshot(volumeName, bucketName, snapshot2); + // create snapshot 3 before checkpoint + client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3"); + // Also wait for double buffer to flush to ensure all transactions are committed + om.awaitDoubleBufferFlush(); DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for verification capturedCheckpoint.set(checkpoint); @@ -674,16 +678,16 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false); when(responseMock.getOutputStream()).thenReturn(servletOutputStream); - // Process checkpoint servlet - this is where old vs new code differs + // Process checkpoint servlet omDbCheckpointServletMock.doGet(requestMock, responseMock); - // Purge snapshot3 - purgeSnapshot(volumeName, bucketName, snapshot3); - // Verify live OM now only sees 1 snapshot - List snapshotsAfterPurge = new ArrayList<>(); - // simulating a purge here by only adding active snapshots + snapshots.clear(); client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) - .forEachRemaining(snapshotsAfterPurge::add); - assertEquals(1, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge"); + .forEachRemaining(snapshots::add); + assertEquals(2, snapshots.size(), "Should have 2 snapshots"); + OzoneSnapshot snapshot3 = snapshots.stream() + .filter(snap -> snap.getName().equals("snapshot3")) + .findFirst() + .orElseThrow(() -> new RuntimeException("snapshot2 not found")); // Extract tarball and verify contents String testDirName = folder.resolve("testDir").toString(); String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; @@ -765,7 +769,7 @@ private void writeDummyKeyToDeleteTableOfSnapshotDB(OzoneSnapshot snapshotToModi private void setupClusterAndMocks(String volumeName, String bucketName, AtomicReference realCheckpoint, boolean includeSnapshots) throws Exception { setupCluster(); - setupMocks(); + setupMocks(false); om.getKeyManager().getSnapshotSstFilteringService().pause(); when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)) .thenReturn(String.valueOf(includeSnapshots)); From fe2413790da534374d01e5c2ecfe8564a178ebbb Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Sat, 1 Nov 2025 20:53:37 +0530 Subject: [PATCH 10/11] address comment --- ...stOMDbCheckpointServletInodeBasedXfer.java | 91 ++++++++++++++----- .../om/ratis/OzoneManagerDoubleBuffer.java | 29 ++++++ 2 files changed, 96 insertions(+), 24 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index 9da1a8826a78..381a3ba88aa0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -64,6 +64,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -90,10 +91,11 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.Archiver; -import org.apache.hadoop.hdds.utils.DBCheckpointServlet; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.TestDataUtil; @@ -153,7 +155,7 @@ void init() throws Exception { // ensure cache entries are not evicted thereby snapshot db's are not closed conf.setTimeDuration(OMConfigKeys.OZONE_OM_SNAPSHOT_CACHE_CLEANUP_SERVICE_RUN_INTERVAL, 100, TimeUnit.MINUTES); - conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); } @AfterEach @@ -170,7 +172,7 @@ private void setupCluster() throws Exception { om = cluster.getOzoneManager(); } - private void setupMocks(boolean useNoOpBootstrapLock) throws Exception { + private void setupMocks() throws Exception { final Path tempPath = folder.resolve("temp" + COUNTER.incrementAndGet() + ".tar"); tempFile = tempPath.toFile(); @@ -245,12 +247,6 @@ public void write(int b) throws IOException { .transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap()); doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(anyBoolean()); doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any()); - if (useNoOpBootstrapLock) { - // Override the lock to be a no-op so purgeSnapshot can work inside the callback - BootstrapStateHandler.Lock noOpLock = new DBCheckpointServlet.Lock(); - when(omDbCheckpointServletMock.getBootstrapStateLock()) - .thenReturn(noOpLock); - } } @ParameterizedTest @@ -414,7 +410,7 @@ public void testSnapshotDBConsistency() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { - setupMocks(false); + setupMocks(); Path dbDir = folder.resolve("db_data"); Files.createDirectories(dbDir); // Create dummy files: one SST, one non-SST @@ -630,17 +626,9 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), "data1".getBytes(StandardCharsets.UTF_8)); client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot1"); - - // Create key before second snapshot - TestDataUtil.createKey(bucket, "key2", - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "data2".getBytes(StandardCharsets.UTF_8)); client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2"); - TestDataUtil.createKey(bucket, "key3", - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "data3".getBytes(StandardCharsets.UTF_8)); - // At this point: Live OM has snapshots S1, S2 , S3 + // At this point: Live OM has snapshots S1, S2 List snapshots = new ArrayList<>(); client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshots::add); @@ -650,9 +638,45 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { .findFirst() .orElseThrow(() -> new RuntimeException("snapshot2 not found")); + purgeSnapshot(volumeName, bucketName, snapshot2); + // Setup servlet mocks for checkpoint processing - setupMocks(true); + setupMocks(); when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true"); + // custom lock because the original lock waits for double buffer flush. + BootstrapStateHandler.Lock customLock = new BootstrapStateHandler.Lock() { + private final List serviceLocks; + + { + OMDBCheckpointServlet.Lock realLock = new OMDBCheckpointServlet.Lock(om); + serviceLocks = Stream.of( + om.getKeyManager().getDeletingService(), + om.getKeyManager().getDirDeletingService(), + om.getKeyManager().getSnapshotSstFilteringService(), + om.getKeyManager().getSnapshotDeletingService(), + om.getMetadataManager().getStore().getRocksDBCheckpointDiffer() + ) + .filter(Objects::nonNull) + .map(BootstrapStateHandler::getBootstrapStateLock) + .collect(Collectors.toList()); + } + + @Override + public BootstrapStateHandler.Lock lock() throws InterruptedException { + for (BootstrapStateHandler.Lock lock : serviceLocks) { + lock.lock(); + } + // Skip awaitDoubleBufferFlush() + return this; + } + + @Override + public void unlock() { + serviceLocks.forEach(BootstrapStateHandler.Lock::unlock); + } + }; + + when(omDbCheckpointServletMock.getBootstrapStateLock()).thenReturn(customLock); // Create a checkpoint that captures current state (S1, S3) DBStore dbStore = om.getMetadataManager().getStore(); @@ -661,8 +685,8 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> { // Purge snapshot2 before checkpoint - purgeSnapshot(volumeName, bucketName, snapshot2); // create snapshot 3 before checkpoint + om.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer().unpause(); client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3"); // Also wait for double buffer to flush to ensure all transactions are committed om.awaitDoubleBufferFlush(); @@ -713,17 +737,36 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { private void purgeSnapshot(String volumeName, String bucketName, OzoneSnapshot snapshot) throws IOException, InterruptedException, TimeoutException { + // pauses the double buffer thread, waits for snapshot to be in deleted state + // and waits for SDS to pick up and purge the deleted snapshot. + om.awaitDoubleBufferFlush(); + om.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer().pause(); String snapshotTableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshot.getName()); // delete snapshot and wait for snapshot to be purged client.getObjectStore().deleteSnapshot(volumeName, bucketName, snapshot.getName()); + GenericTestUtils.waitFor(() -> { + try { + CacheValue cacheValue = + om.getMetadataManager().getSnapshotInfoTable().getCacheValue(new CacheKey<>(snapshotTableKey)); + SnapshotInfo snapshotInfo = cacheValue != null ? cacheValue.getCacheValue() : null; + return snapshotInfo != null && + snapshotInfo.getSnapshotStatus().name().equals(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED.name()); + } catch (Exception ex) { + LOG.error("Exception while querying snapshot info for key in cache {}", snapshotTableKey, ex); + return false; + } + }, 100, 30_000); GenericTestUtils.waitFor(() -> { try { return om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey) == null; } catch (Exception ex) { - LOG.error("Exception while querying snapshot info for key {}", snapshotTableKey, ex); + LOG.error("Exception while querying snapshot info for key in cache {}", snapshotTableKey, ex); return false; } - }, 100, 20_000); + }, 100, 40_000); + // Since DoubleBuffer thread is paused these are not committed to DB + assertNotNull(om.getMetadataManager() + .getSnapshotInfoTable().getSkipCache(snapshotTableKey)); } private static void deleteWalFiles(Path snapshotDbDir) throws IOException { @@ -769,7 +812,7 @@ private void writeDummyKeyToDeleteTableOfSnapshotDB(OzoneSnapshot snapshotToModi private void setupClusterAndMocks(String volumeName, String bucketName, AtomicReference realCheckpoint, boolean includeSnapshots) throws Exception { setupCluster(); - setupMocks(false); + setupMocks(); om.getKeyManager().getSnapshotSstFilteringService().pause(); when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)) .thenReturn(String.valueOf(includeSnapshots)); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 04515dcd728a..c2213e6f74c2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -83,6 +83,7 @@ public final class OzoneManagerDoubleBuffer { private final Daemon daemon; /** Is the {@link #daemon} running? */ private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicBoolean isPaused = new AtomicBoolean(false); /** Notify flush operations are completed by the {@link #daemon}. */ private final FlushNotifier flushNotifier; @@ -277,6 +278,18 @@ private void addToBatchTransactionInfoWithTrace(String parentName, @VisibleForTesting public void flushTransactions() { while (isRunning.get() && canFlush()) { + // Check if paused + synchronized (this) { + while (isPaused.get() && isRunning.get()) { + try { + this.wait(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + } + } + flushCurrentBuffer(); } } @@ -509,6 +522,22 @@ public void stopDaemon() { } } + @VisibleForTesting + public void pause() { + synchronized (this) { + isPaused.set(true); + this.notifyAll(); + } + } + + @VisibleForTesting + public void unpause() { + synchronized (this) { + isPaused.set(false); + this.notifyAll(); + } + } + private void terminate(Throwable t, int status) { terminate(t, status, null); } From 4a383715b3d1d5d610066c882fd55ba3cf95eb5a Mon Sep 17 00:00:00 2001 From: Sadanand Shenoy Date: Sun, 2 Nov 2025 12:29:38 +0530 Subject: [PATCH 11/11] remove race condition for purge --- ...stOMDbCheckpointServletInodeBasedXfer.java | 117 +++--------------- .../om/ratis/OzoneManagerDoubleBuffer.java | 29 ----- 2 files changed, 17 insertions(+), 129 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index 381a3ba88aa0..f2b94182c809 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -64,11 +64,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -94,8 +92,6 @@ import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.TestDataUtil; @@ -106,14 +102,12 @@ import org.apache.hadoop.ozone.om.codec.OMDBDefinition; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.hadoop.ozone.om.service.KeyDeletingService; import org.apache.hadoop.ozone.om.service.SnapshotDeletingService; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; -import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -603,11 +597,9 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception { * Tests the full checkpoint servlet flow to ensure snapshot paths are read * from checkpoint metadata (frozen state) rather than live OM metadata (current state). * Scenario: - * 1. Create snapshots S1, S2, S3 - * 2. Purge S2 - * 3. Checkpoint is created - * 4. S3 gets purged from live OM (after checkpoint creation) - * 5. Servlet processes checkpoint - should still include S1, S3 data as + * 1. Create snapshots S1 + * 2. create snapshot S2 later just before checkpoint + * 3. Servlet processes checkpoint - should still include S1, S3 data as * checkpoint snapshotInfoTable has S1 S3 */ @Test @@ -626,59 +618,21 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception { ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), "data1".getBytes(StandardCharsets.UTF_8)); client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot1"); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2"); - - // At this point: Live OM has snapshots S1, S2 + // At this point: Live OM has snapshots S1 List snapshots = new ArrayList<>(); client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshots::add); - assertEquals(2, snapshots.size(), "Should have 2 snapshots initially"); - OzoneSnapshot snapshot2 = snapshots.stream() - .filter(snap -> snap.getName().equals("snapshot2")) + assertEquals(1, snapshots.size(), "Should have 1 snapshot initially"); + OzoneSnapshot snapshot1 = snapshots.stream() + .filter(snap -> snap.getName().equals("snapshot1")) .findFirst() - .orElseThrow(() -> new RuntimeException("snapshot2 not found")); - - purgeSnapshot(volumeName, bucketName, snapshot2); + .orElseThrow(() -> new RuntimeException("snapshot1 not found")); // Setup servlet mocks for checkpoint processing setupMocks(); when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true"); - // custom lock because the original lock waits for double buffer flush. - BootstrapStateHandler.Lock customLock = new BootstrapStateHandler.Lock() { - private final List serviceLocks; - - { - OMDBCheckpointServlet.Lock realLock = new OMDBCheckpointServlet.Lock(om); - serviceLocks = Stream.of( - om.getKeyManager().getDeletingService(), - om.getKeyManager().getDirDeletingService(), - om.getKeyManager().getSnapshotSstFilteringService(), - om.getKeyManager().getSnapshotDeletingService(), - om.getMetadataManager().getStore().getRocksDBCheckpointDiffer() - ) - .filter(Objects::nonNull) - .map(BootstrapStateHandler::getBootstrapStateLock) - .collect(Collectors.toList()); - } - @Override - public BootstrapStateHandler.Lock lock() throws InterruptedException { - for (BootstrapStateHandler.Lock lock : serviceLocks) { - lock.lock(); - } - // Skip awaitDoubleBufferFlush() - return this; - } - - @Override - public void unlock() { - serviceLocks.forEach(BootstrapStateHandler.Lock::unlock); - } - }; - - when(omDbCheckpointServletMock.getBootstrapStateLock()).thenReturn(customLock); - - // Create a checkpoint that captures current state (S1, S3) + // Create a checkpoint that captures current state (S1) DBStore dbStore = om.getMetadataManager().getStore(); DBStore spyDbStore = spy(dbStore); AtomicReference capturedCheckpoint = new AtomicReference<>(); @@ -686,8 +640,7 @@ public void unlock() { when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> { // Purge snapshot2 before checkpoint // create snapshot 3 before checkpoint - om.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer().unpause(); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3"); + client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2"); // Also wait for double buffer to flush to ensure all transactions are committed om.awaitDoubleBufferFlush(); DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); @@ -708,8 +661,8 @@ public void unlock() { client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) .forEachRemaining(snapshots::add); assertEquals(2, snapshots.size(), "Should have 2 snapshots"); - OzoneSnapshot snapshot3 = snapshots.stream() - .filter(snap -> snap.getName().equals("snapshot3")) + OzoneSnapshot snapshot2 = snapshots.stream() + .filter(snap -> snap.getName().equals("snapshot2")) .findFirst() .orElseThrow(() -> new RuntimeException("snapshot2 not found")); // Extract tarball and verify contents @@ -719,56 +672,20 @@ public void unlock() { assertTrue(newDbDir.mkdirs()); FileUtil.unTar(tempFile, newDbDir); OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true); + Path snapshot1DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, + OM_DB_NAME + "-" + snapshot1.getSnapshotId()); Path snapshot2DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, OM_DB_NAME + "-" + snapshot2.getSnapshotId()); - Path snapshot3DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, - OM_DB_NAME + "-" + snapshot3.getSnapshotId()); + boolean snapshot1IncludedInCheckpoint = Files.exists(snapshot1DbDir); boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir); - boolean snapshot3IncludedInCheckpoint = Files.exists(snapshot3DbDir); - assertFalse(snapshot2IncludedInCheckpoint, - "Checkpoint should not include snapshot2 as it was purged from live OM and captured in checkpoint."); - assertTrue(snapshot3IncludedInCheckpoint, - "Checkpoint should include snapshot3 data even though purged as checkpoint has not captured the purge"); + assertTrue(snapshot1IncludedInCheckpoint && snapshot2IncludedInCheckpoint, + "Checkpoint should include both snapshot1 and snapshot2 data"); // Cleanup if (capturedCheckpoint.get() != null) { capturedCheckpoint.get().cleanupCheckpoint(); } } - private void purgeSnapshot(String volumeName, String bucketName, OzoneSnapshot snapshot) - throws IOException, InterruptedException, TimeoutException { - // pauses the double buffer thread, waits for snapshot to be in deleted state - // and waits for SDS to pick up and purge the deleted snapshot. - om.awaitDoubleBufferFlush(); - om.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer().pause(); - String snapshotTableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshot.getName()); - // delete snapshot and wait for snapshot to be purged - client.getObjectStore().deleteSnapshot(volumeName, bucketName, snapshot.getName()); - GenericTestUtils.waitFor(() -> { - try { - CacheValue cacheValue = - om.getMetadataManager().getSnapshotInfoTable().getCacheValue(new CacheKey<>(snapshotTableKey)); - SnapshotInfo snapshotInfo = cacheValue != null ? cacheValue.getCacheValue() : null; - return snapshotInfo != null && - snapshotInfo.getSnapshotStatus().name().equals(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED.name()); - } catch (Exception ex) { - LOG.error("Exception while querying snapshot info for key in cache {}", snapshotTableKey, ex); - return false; - } - }, 100, 30_000); - GenericTestUtils.waitFor(() -> { - try { - return om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey) == null; - } catch (Exception ex) { - LOG.error("Exception while querying snapshot info for key in cache {}", snapshotTableKey, ex); - return false; - } - }, 100, 40_000); - // Since DoubleBuffer thread is paused these are not committed to DB - assertNotNull(om.getMetadataManager() - .getSnapshotInfoTable().getSkipCache(snapshotTableKey)); - } - private static void deleteWalFiles(Path snapshotDbDir) throws IOException { try (Stream filesInTarball = Files.list(snapshotDbDir)) { List files = filesInTarball.filter(p -> p.toString().contains(".log")) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index c2213e6f74c2..04515dcd728a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -83,7 +83,6 @@ public final class OzoneManagerDoubleBuffer { private final Daemon daemon; /** Is the {@link #daemon} running? */ private final AtomicBoolean isRunning = new AtomicBoolean(false); - private final AtomicBoolean isPaused = new AtomicBoolean(false); /** Notify flush operations are completed by the {@link #daemon}. */ private final FlushNotifier flushNotifier; @@ -278,18 +277,6 @@ private void addToBatchTransactionInfoWithTrace(String parentName, @VisibleForTesting public void flushTransactions() { while (isRunning.get() && canFlush()) { - // Check if paused - synchronized (this) { - while (isPaused.get() && isRunning.get()) { - try { - this.wait(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - } - } - flushCurrentBuffer(); } } @@ -522,22 +509,6 @@ public void stopDaemon() { } } - @VisibleForTesting - public void pause() { - synchronized (this) { - isPaused.set(true); - this.notifyAll(); - } - } - - @VisibleForTesting - public void unpause() { - synchronized (this) { - isPaused.set(false); - this.notifyAll(); - } - } - private void terminate(Throwable t, int status) { terminate(t, status, null); }