Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doCallRealMethod;
Expand Down Expand Up @@ -99,6 +101,7 @@
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
Expand Down Expand Up @@ -234,6 +237,10 @@ public void write(int b) throws IOException {
doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any());
doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir();
doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir();
doCallRealMethod().when(omDbCheckpointServletMock)
.transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap());
doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(anyBoolean());
doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any());
}

@ParameterizedTest
Expand Down Expand Up @@ -586,6 +593,105 @@ public void testBootstrapLockBlocksMultipleServices() throws Exception {
assertTrue(servicesSucceeded.get() > 0, "Services should have succeeded after lock release");
}

/**
* Tests the full checkpoint servlet flow to ensure snapshot paths are read
* from checkpoint metadata (frozen state) rather than live OM metadata (current state).
* Scenario:
* 1. Create snapshots S1, S2
* 2. Checkpoint is created (freezes state with S1, S2)
* 3. S2 gets purged from live OM (after checkpoint creation)
* 4. Servlet processes checkpoint - should still include S2 data
*/
@Test
public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we need a mini ozone cluster test case for this? I believe we can do with a unit test case here instead of a full mini ozone cluster test. We can think about moving this test into unit test later

String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5);
String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5);

setupCluster();
om.getKeyManager().getSnapshotSstFilteringService().pause();

// Create test data and snapshots
OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client, volumeName, bucketName);

// Create key before first snapshot
TestDataUtil.createKey(bucket, "key1",
ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE),
"data1".getBytes(StandardCharsets.UTF_8));
client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot1");

// Create key before second snapshot
TestDataUtil.createKey(bucket, "key2",
ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE),
"data2".getBytes(StandardCharsets.UTF_8));
client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2");
om.getMetadataManager().getStore().flushDB();

// At this point: Live OM has snapshots S1, S2
List<OzoneSnapshot> snapshotsBeforePurge = new ArrayList<>();
client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
.forEachRemaining(snapshotsBeforePurge::add);
assertEquals(2, snapshotsBeforePurge.size(), "Should have 2 snapshots initially");
OzoneSnapshot snapshot2 = snapshotsBeforePurge.stream()
.filter(snap -> snap.getName().equals("snapshot2"))
.findFirst()
.orElseThrow(() -> new RuntimeException("snapshot2 not found"));

// Setup servlet mocks for checkpoint processing
setupMocks();
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");

// Create a checkpoint that captures current state (S1, S2)
DBStore dbStore = om.getMetadataManager().getStore();
DBStore spyDbStore = spy(dbStore);
AtomicReference<DBCheckpoint> capturedCheckpoint = new AtomicReference<>();

when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> {
DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for verification
capturedCheckpoint.set(checkpoint);
return checkpoint;
});
Comment on lines +640 to +650

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> {
DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for verification
capturedCheckpoint.set(checkpoint);
return checkpoint;
});
when(spyDbStore.getCheckpoint(eq(true))).thenAnswer(invocation -> {
client.getObjectStore().deleteSnapshot(volumeName, bucketName, "snapshot2");
client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot3");
// wait for snapshot2 to get purged and snapshot3 to get created.
DBCheckpoint checkpoint = spy(Mockito.callRealMethod());
doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for verification
capturedCheckpoint.set(checkpoint);
return checkpoint;
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadanand48 Let us purge the snapshot just before we are trying to take a checkpoint which means that we should purge the snapshot in this block

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the checkpoint is taken after the bootstrap lock is acquired so once bootstrap lock is aquired the purge won't be succesful in this block as SDS needs to acquire this lock to do the purge. We need to do it outside the lock itself

@swamirishi swamirishi Oct 31, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the bootstrap on the follower OM? Bootstrap lock there won't mean anything when this runs on the follower. To get into this condition maybe we should mock bootstrap lock to do a noop here.
Actually what we should do is during BootstrapLock we should pause the double buffer thread delete the snapshot and acquire the bootstrap lock. Now just before the checkpoint we should unpause the double buffer thread and let the snapshot purge get flushed this would do it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// Initialize servlet
doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
eq(false), any(), any(), eq(false));
omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(),
false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false);
when(responseMock.getOutputStream()).thenReturn(servletOutputStream);
// Process checkpoint servlet - this is where old vs new code differs
omDbCheckpointServletMock.doGet(requestMock, responseMock);
// Purge snapshot2 from live OM AFTER checkpoint creation but BEFORE servlet processing
String snapshot2TableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshot2.getName());
// Simulate the purge operation that removes from snapshotInfoTable
om.getMetadataManager().getSnapshotInfoTable().delete(snapshot2TableKey);
om.getMetadataManager().getStore().flushDB();

// Verify live OM now only sees 1 snapshot
List<OzoneSnapshot> snapshotsAfterPurge = new ArrayList<>();
// simulating a purge here by only adding active snapshots
client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
.forEachRemaining(snapshotsAfterPurge::add);
assertEquals(1, snapshotsAfterPurge.size(), "Should have 1 snapshot after purge");
// Extract tarball and verify contents
String testDirName = folder.resolve("testDir").toString();
String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
File newDbDir = new File(newDbDirName);
assertTrue(newDbDir.mkdirs());
FileUtil.unTar(tempFile, newDbDir);
OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true);
Path snapshot2DbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR,
OM_DB_NAME + "-" + snapshot2.getSnapshotId());
boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir);
// The critical assertion: checkpoint should include snapshot2 data
// even though it was purged from live OM after checkpoint creation
assertTrue(snapshot2IncludedInCheckpoint,
"Checkpoint should include snapshot2 data even though it was purged from live OM.");
// Cleanup
if (capturedCheckpoint.get() != null) {
capturedCheckpoint.get().cleanupCheckpoint();
}
}

private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
List<Path> files = filesInTarball.filter(p -> p.toString().contains(".log"))
Expand Down Expand Up @@ -648,6 +754,7 @@ private void setupClusterAndMocks(String volumeName, String bucketName,
// Init the mock with the spyDbstore
doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
eq(false), any(), any(), eq(false));
doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any());
omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(),
false,
om.getOmAdminUsernames(), om.getOmAdminGroups(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
Expand Down Expand Up @@ -263,6 +264,17 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
writeDBToArchive(sstFilesToExclude, checkpointDir,
maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false);
if (includeSnapshotData) {
// get the list of snapshots from the checkpoint
OmMetadataManagerImpl checkpointMetadataManager = null;
try {
Comment thread
sadanand48 marked this conversation as resolved.
Outdated
Comment thread
sadanand48 marked this conversation as resolved.
Outdated
checkpointMetadataManager =
OmMetadataManagerImpl.createCheckpointMetadataManager(om.getConfiguration(), checkpoint);
snapshotPaths = getSnapshotDirsFromDB(checkpointMetadataManager);
} finally {
if (checkpointMetadataManager != null) {
checkpointMetadataManager.stop();
}
}
writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, archiveOutputStream, tmpdir,
hardLinkFileMap, false);
writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
Expand Down Expand Up @@ -295,7 +307,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
* @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication.
* @throws IOException if an I/O error occurs during processing.
*/
private void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Set<Path> snapshotPaths,
void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Set<Path> snapshotPaths,
Comment thread
sadanand48 marked this conversation as resolved.
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream,
Map<String, String> hardLinkFileMap) throws IOException {
OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
Expand Down Expand Up @@ -398,6 +410,29 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio
return snapshotPaths;
}

/**
* Collects paths to all snapshot databases from the OM DB.
*
* @param omMetadataManager OMMetadataManager instance
* @return Set of paths to snapshot databases
* @throws IOException if an I/O error occurs
*/
Set<Path> getSnapshotDirsFromDB(OMMetadataManager omMetadataManager) throws IOException {
Comment thread
sadanand48 marked this conversation as resolved.
Set<Path> snapshotPaths = new HashSet<>();
Comment thread
sadanand48 marked this conversation as resolved.
try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> iter =
omMetadataManager.getSnapshotInfoTable().iterator()) {
while (iter.hasNext()) {
Table.KeyValue<String, SnapshotInfo> kv = iter.next();
SnapshotInfo snapshotInfo = kv.getValue();
String snapshotDir = OmSnapshotManager.getSnapshotPath(getConf(),
Comment thread
sadanand48 marked this conversation as resolved.
snapshotInfo.getCheckpointDirName());
Path path = Paths.get(snapshotDir);
snapshotPaths.add(path);
}
}
return snapshotPaths;
}

@VisibleForTesting
boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize,
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir,
Expand Down Expand Up @@ -482,7 +517,7 @@ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Stream<Path> fil
* @param flush If true, flushes in-memory data to disk before checkpointing.
* @throws IOException If an error occurs during checkpoint creation or file copying.
*/
private DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws IOException {
DBCheckpoint createAndPrepareCheckpoint(boolean flush) throws IOException {
// Create & return the checkpoint.
return getDbStore().getCheckpoint(flush);
}
Expand Down