diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 1d47fb72958f..db66fed22fe9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -277,6 +277,15 @@ public final class OzoneConfigKeys { OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s"; // 300s for default + public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT = + "ozone.snapshot.defrag.service.timeout"; + public static final String + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s"; + // TODO: Adjust timeout as needed. + // One concern would be that snapdiff can take a long time. + // If snapdiff wait time is included in the timeout it can make it indeterministic. + // -- So don't wait? Trigger and check later? + public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL = "ozone.snapshot.deleting.service.interval"; public static final String diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index c9064da1781c..cb4490c2c1db 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -520,6 +520,7 @@ public final class OzoneConsts { public static final String OM_SNAPSHOT_DIR = "db.snapshots"; public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "checkpointState"; + public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR = "checkpointStateDefragged"; public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "diffState"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 02f1e71bfcf7..cae7d66a3070 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3758,6 +3758,14 @@ Snapshot Deleting Service per run. + + ozone.snapshot.defrag.limit.per.task + 1 + OZONE, PERFORMANCE, OM + The maximum number of snapshots that would be defragmented in + each task run of snapshot defragmentation service. + + ozone.snapshot.filtering.service.interval 1m @@ -3765,6 +3773,13 @@ Time interval of the SST File filtering service from Snapshot. + + ozone.snapshot.defrag.service.interval + -1 + OZONE, PERFORMANCE, OM + Task interval of snapshot defragmentation service. + + ozone.om.snapshot.checkpoint.dir.creation.poll.timeout 20s diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java index e84854cae443..5aa561ba9486 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java @@ -28,7 +28,7 @@ /** * DumpFileWriter using rocksdb sst files. */ -class RDBSstFileWriter implements Closeable { +public class RDBSstFileWriter implements Closeable { private ManagedSstFileWriter sstFileWriter; private File sstFile; @@ -36,7 +36,7 @@ class RDBSstFileWriter implements Closeable { private ManagedOptions emptyOption = new ManagedOptions(); private final ManagedEnvOptions emptyEnvOptions = new ManagedEnvOptions(); - RDBSstFileWriter(File externalFile) throws RocksDatabaseException { + public RDBSstFileWriter(File externalFile) throws RocksDatabaseException { this.sstFileWriter = new ManagedSstFileWriter(emptyEnvOptions, emptyOption); this.keyCounter = new AtomicLong(0); this.sstFile = externalFile; @@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws RocksDatabaseException { } } + public void delete(byte[] key) throws RocksDatabaseException { + try { + sstFileWriter.delete(key); + keyCounter.incrementAndGet(); + } catch (RocksDBException e) { + closeOnFailure(); + throw new RocksDatabaseException("Failed to delete key (length=" + key.length + + "), sstFile=" + sstFile.getAbsolutePath(), e); + } + } + @Override public void close() throws RocksDatabaseException { if (sstFileWriter != null) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 9eeb69ece3d8..30622c8e36ff 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -237,7 +237,7 @@ public boolean isClosed() { * * @see ManagedCheckpoint */ - final class RocksCheckpoint implements Closeable { + public final class RocksCheckpoint implements Closeable { private final ManagedCheckpoint checkpoint; private RocksCheckpoint() { @@ -609,7 +609,7 @@ public List getLiveFilesMetaData() throws RocksDatabaseExcepti } } - RocksCheckpoint createCheckpoint() { + public RocksCheckpoint createCheckpoint() { return new RocksCheckpoint(); } @@ -660,7 +660,7 @@ public Collection getExtraColumnFamilies() { return Collections.unmodifiableCollection(columnFamilies.values()); } - byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException { + public byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { return db.get().get(family.getHandle(), key); } catch (RocksDBException e) { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 87bc9cb30170..969288ed92c8 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -428,11 +428,22 @@ public final class OMConfigKeys { "ozone.snapshot.deleting.limit.per.task"; public static final int SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT = 10; + // Snapshot defragmentation service configuration + public static final String SNAPSHOT_DEFRAG_LIMIT_PER_TASK = + "ozone.snapshot.defrag.limit.per.task"; + public static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT = 1; + public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL = "ozone.snapshot.filtering.service.interval"; public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s"; + public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL = + "ozone.snapshot.defrag.service.interval"; + public static final String + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "-1"; + // TODO: Disabled by default. Do not enable by default until upgrade handling is complete. + public static final String OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT = "ozone.om.snapshot.checkpoint.dir.creation.poll.timeout"; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService.java new file mode 100644 index 000000000000..e27c4fe8aa76 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.ozone.test.LambdaTestUtils.await; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.DBProfile; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMStorage; +import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.SnapshotDefragService; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * Test SnapshotDefragService functionality using MiniOzoneCluster. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestSnapshotDefragService { + + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotDefragService.class); + + private MiniOzoneCluster cluster; + private OzoneClient client; + private ObjectStore store; + private OzoneManager ozoneManager; + private SnapshotDefragService defragService; + + @BeforeAll + void setup() throws Exception { + // Enable debug logging for SnapshotDefragService + GenericTestUtils.setLogLevel(LoggerFactory.getLogger(SnapshotDefragService.class), Level.DEBUG); + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 30, TimeUnit.SECONDS); + conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST); + conf.setBoolean(OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); + conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, BucketLayout.OBJECT_STORE.name()); + // TODO: Add SNAPSHOT_DEFRAGMENTATION layout feature version + conf.setInt(OMStorage.TESTING_INIT_LAYOUT_VERSION_KEY, + OMLayoutFeature.DELEGATION_TOKEN_SYMMETRIC_SIGN.layoutVersion()); + + conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, -1); + conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, 3000000); + conf.setInt(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, 5); + + conf.setQuietMode(false); + + // Create MiniOzoneCluster + cluster = MiniOzoneCluster.newBuilder(conf).build(); + cluster.waitForClusterToBeReady(); + + client = cluster.newClient(); + store = client.getObjectStore(); + ozoneManager = cluster.getOzoneManager(); + + // Create SnapshotDefragService for manual triggering + defragService = new SnapshotDefragService( + 10000, // interval + TimeUnit.MILLISECONDS, + 30000, // service timeout + ozoneManager, + conf + ); + } + + @AfterAll + public void cleanup() throws Exception { + if (defragService != null) { + defragService.shutdown(); + } + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Create keys in a bucket. + */ + private void createKeys(String volumeName, String bucketName, int keyCount) throws Exception { + OzoneVolume volume = store.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < keyCount; i++) { + String keyName = "key-" + i; + String data = RandomStringUtils.randomAlphabetic(100); + + try (OzoneOutputStream outputStream = bucket.createKey(keyName, data.length(), + StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), + java.util.Collections.emptyMap(), java.util.Collections.emptyMap())) { + outputStream.write(data.getBytes()); + } + } + LOG.info("Created {} keys in bucket {}/{}", keyCount, volumeName, bucketName); + } + + /** + * Create a snapshot and wait for it to be available. + */ + private void createSnapshot(String volumeName, String bucketName, String snapshotName) + throws Exception { + // Get existing checkpoint directories before creating snapshot + Set existingCheckpoints = getExistingCheckpointDirectories(); + + store.createSnapshot(volumeName, bucketName, snapshotName); + + // Wait for snapshot to be created + GenericTestUtils.waitFor(() -> { + try { + SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName)); + return snapshotInfo != null; + } catch (IOException e) { + return false; + } + }, 1000, 30000); + + // Wait for checkpoint DB to be created + waitForCheckpointDB(snapshotName, existingCheckpoints); + + LOG.info("Created snapshot: {}", snapshotName); + } + + /** + * Get existing checkpoint directories before snapshot creation. + */ + private Set getExistingCheckpointDirectories() { + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState"; + File checkpointParentDir = new File(checkpointStateDir); + + Set existingDirs = new java.util.HashSet<>(); + if (checkpointParentDir.exists()) { + File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory); + if (checkpointDirs != null) { + for (File dir : checkpointDirs) { + existingDirs.add(dir.getName()); + } + } + } + + LOG.debug("Existing checkpoint directories: {}", existingDirs); + return existingDirs; + } + + /** + * Wait for checkpoint DB to be created under checkpointState directory. + */ + private void waitForCheckpointDB(String snapshotName, Set existingCheckpoints) throws Exception { + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState"; + File checkpointParentDir = new File(checkpointStateDir); + + LOG.info("Waiting for new checkpoint DB to be created under: {}", checkpointStateDir); + + GenericTestUtils.waitFor(() -> { + if (!checkpointParentDir.exists()) { + LOG.debug("CheckpointState directory does not exist yet: {}", checkpointStateDir); + return false; + } + + // List all directories in checkpointState + File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory); + if (checkpointDirs == null || checkpointDirs.length == 0) { + LOG.debug("No checkpoint directories found in: {}", checkpointStateDir); + return false; + } + + // Look for new checkpoint directories that weren't there before + for (File checkpointDir : checkpointDirs) { + String dirName = checkpointDir.getName(); + + // Skip if this directory existed before snapshot creation + if (existingCheckpoints.contains(dirName)) { + continue; + } + + // Check if the new directory contains database files + File[] dbFiles = checkpointDir.listFiles(); + if (dbFiles != null && dbFiles.length > 0) { + for (File dbFile : dbFiles) { + if (dbFile.isFile() && (dbFile.getName().endsWith(".sst") || + dbFile.getName().equals("CURRENT") || + dbFile.getName().startsWith("MANIFEST"))) { + LOG.info("New checkpoint DB found for snapshot {} in directory: {}", + snapshotName, checkpointDir.getAbsolutePath()); + return true; + } + } + } + + LOG.debug("New checkpoint directory found but no DB files yet: {}", checkpointDir.getAbsolutePath()); + } + + LOG.debug("Waiting for new checkpoint DB files to appear in checkpointState directories"); + return false; + }, 1000, 60000); // Wait up to 60 seconds for checkpoint DB creation + + LOG.info("Checkpoint DB created successfully for snapshot: {}", snapshotName); + } + + /** + * Lists the contents of the db.snapshots directory. + */ + private void printSnapshotDirectoryListing(String description) { + LOG.info("=== {} ===", description); + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String snapshotDir = metadataDir + "/db.snapshots"; + File snapshotsDir = new File(snapshotDir); + + if (!snapshotsDir.exists()) { + LOG.info("Snapshots directory does not exist: {}", snapshotDir); + return; + } + + try (Stream paths = Files.walk(Paths.get(snapshotDir))) { + paths.sorted() + .forEach(path -> { + File file = path.toFile(); + String relativePath = Paths.get(snapshotDir).relativize(path).toString(); + if (file.isDirectory()) { + LOG.info("Directory: {}/", relativePath.isEmpty() ? "." : relativePath); + } else { + LOG.info("File: {} (size: {} bytes)", relativePath, file.length()); + } + }); + } catch (IOException e) { + LOG.error("Error listing snapshot directory: {}", snapshotDir, e); + } + } + + /** + * Mark a snapshot as needing defragmentation by updating its YAML metadata. + * TODO: This is a workaround as the current logic does not + * automatically mark snapshots as needing defragmentation. + * This is not needed. + */ + private void markSnapshotAsNeedingDefragmentation(SnapshotInfo snapshotInfo) throws IOException { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + File yamlFile = new File(snapshotPath + ".yaml"); + + if (yamlFile.exists()) { + OmSnapshotLocalDataYaml yamlData = + OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(), yamlFile); + yamlData.setNeedsDefrag(true); + yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile); + LOG.info("Marked snapshot {} as needing defragmentation", snapshotInfo.getName()); + } else { + LOG.warn("YAML file not found for snapshot {}: {}", snapshotInfo.getName(), yamlFile.getPath()); + } + } + + /** + * Trigger the SnapshotDefragService by starting it and waiting for it to process snapshots. + */ + private void triggerSnapshotDefragService() throws Exception { + LOG.info("Triggering SnapshotDefragService..."); + + // Mark all snapshots as needing defragmentation first + OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); + try (TableIterator> iterator = + metadataManager.getSnapshotInfoTable().iterator()) { + iterator.seekToFirst(); + while (iterator.hasNext()) { + Table.KeyValue entry = iterator.next(); + SnapshotInfo snapshotInfo = entry.getValue(); + markSnapshotAsNeedingDefragmentation(snapshotInfo); + } + } + + long initialDefragCount = defragService.getSnapshotsDefraggedCount().get(); + LOG.info("Initial defragmented count: {}", initialDefragCount); + + // Start the service + defragService.start(); + + // Wait for the service to process snapshots + try { + await(30000, 1000, () -> { + long currentCount = defragService.getSnapshotsDefraggedCount().get(); + LOG.info("Current defragmented count: {}", currentCount); + return currentCount > initialDefragCount; + }); + } catch (TimeoutException e) { + LOG.warn("Timeout waiting for defragmentation to complete, continuing with test"); + } + + LOG.info("SnapshotDefragService execution completed. Snapshots defragmented: {}", + defragService.getSnapshotsDefraggedCount().get()); + } + + @Test + public void testSnapshotDefragmentation() throws Exception { + String volumeName = "test-volume"; + String bucketName = "test-bucket"; + + // Create volume and bucket + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + // TODO: Test FSO bucket as well, default is LEGACY / OBJECT_STORE + volume.createBucket(bucketName); + + LOG.info("Starting snapshot defragmentation test..."); + + // Print initial state + printSnapshotDirectoryListing("Initial state - no snapshots"); + + // Step 1: Create 2 keys, then create snap-1 + createKeys(volumeName, bucketName, 2); + createSnapshot(volumeName, bucketName, "snap-1"); + printSnapshotDirectoryListing("After creating snap-1 (2 keys)"); + + // Step 2: Create 2 more keys, then create snap-2 + createKeys(volumeName, bucketName, 2); // TODO: This actually overwrites the previous keys + createSnapshot(volumeName, bucketName, "snap-2"); + printSnapshotDirectoryListing("After creating snap-2 (4 keys total)"); + + // Step 3: Create 2 more keys, then create snap-3 + createKeys(volumeName, bucketName, 2); // TODO: This actually overwrites the previous keys + createSnapshot(volumeName, bucketName, "snap-3"); + printSnapshotDirectoryListing("After creating snap-3 (6 keys total)"); + + // Step 4: Trigger SnapshotDefragService + triggerSnapshotDefragService(); + printSnapshotDirectoryListing("After SnapshotDefragService execution"); + + // Verify that the snapshots still exist + SnapshotInfo snap1 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-1")); + SnapshotInfo snap2 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-2")); + SnapshotInfo snap3 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-3")); + + assertNotNull(snap1, "Snapshot snap-1 should exist"); + assertNotNull(snap2, "Snapshot snap-2 should exist"); + assertNotNull(snap3, "Snapshot snap-3 should exist"); + + LOG.info("Test completed successfully"); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 7e76885c49bd..872a99e94b15 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -346,6 +346,12 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId, */ SstFilteringService getSnapshotSstFilteringService(); + /** + * Returns the instance of Snapshot Defrag service. + * @return Background service. + */ + SnapshotDefragService getSnapshotDefragService(); + /** * Returns the instance of Snapshot Deleting service. * @return Background service. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 8c50232355ca..6b6d73355637 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT; @@ -58,6 +60,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; @@ -200,6 +204,7 @@ public class KeyManagerImpl implements KeyManager { private KeyDeletingService keyDeletingService; private SstFilteringService snapshotSstFilteringService; + private SnapshotDefragService snapshotDefragService; private SnapshotDeletingService snapshotDeletingService; private final KeyProviderCryptoExtension kmsProvider; @@ -308,6 +313,11 @@ public void start(OzoneConfiguration configuration) { startSnapshotSstFilteringService(configuration); } + if (snapshotDefragService == null && + ozoneManager.isFilesystemSnapshotEnabled()) { + startSnapshotDefragService(configuration); + } + if (snapshotDeletingService == null && ozoneManager.isFilesystemSnapshotEnabled()) { @@ -391,6 +401,42 @@ public void stopSnapshotSstFilteringService() { } } + /** + * Start the snapshot defrag service if interval is not set to disabled value. + * @param conf + */ + public void startSnapshotDefragService(OzoneConfiguration conf) { + if (isDefragSvcEnabled()) { + long serviceInterval = conf.getTimeDuration( + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = conf.getTimeDuration( + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + snapshotDefragService = + new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS, + serviceTimeout, ozoneManager, conf); + snapshotDefragService.start(); + } else { + LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation will not run periodically."); + } + } + + /** + * Stop the snapshot defrag service if it is running. + */ + public void stopSnapshotDefragService() { + if (snapshotDefragService != null) { + snapshotDefragService.shutdown(); + snapshotDefragService = null; + } else { + LOG.info("SnapshotDefragService is already stopped or not started."); + } + } + private void startCompactionService(OzoneConfiguration configuration, boolean isCompactionServiceEnabled) { if (compactionService == null && isCompactionServiceEnabled) { @@ -417,7 +463,7 @@ KeyProviderCryptoExtension getKMSProvider() { } @Override - public void stop() throws IOException { + public void stop() { if (keyDeletingService != null) { keyDeletingService.shutdown(); keyDeletingService = null; @@ -434,6 +480,10 @@ public void stop() throws IOException { snapshotSstFilteringService.shutdown(); snapshotSstFilteringService = null; } + if (snapshotDefragService != null) { + snapshotDefragService.shutdown(); + snapshotDefragService = null; + } if (snapshotDeletingService != null) { snapshotDeletingService.shutdown(); snapshotDeletingService = null; @@ -448,6 +498,15 @@ public void stop() throws IOException { } } + /** + * Get the SnapshotDefragService instance. + * + * @return SnapshotDefragService instance, or null if not initialized + */ + public SnapshotDefragService getSnapshotDefragService() { + return snapshotDefragService; + } + private OmBucketInfo getBucketInfo(String volumeName, String bucketName) throws IOException { String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); @@ -968,7 +1027,16 @@ public boolean isSstFilteringSvcEnabled() { // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay return serviceInterval > 0; } - + + public boolean isDefragSvcEnabled() { + long serviceInterval = ozoneManager.getConfiguration() + .getTimeDuration(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay + return serviceInterval > 0; + } + @Override public OmMultipartUploadList listMultipartUploads(String volumeName, String bucketName, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java index 1d4fedfacaaf..7598636b816b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java @@ -227,8 +227,10 @@ private final class ConstructSnapshotLocalData extends AbstractConstruct { public Object construct(Node node) { MappingNode mnode = (MappingNode) node; Map nodes = constructMapping(mnode); - UUID snapId = UUID.fromString((String) nodes.get(OzoneConsts.OM_SLD_SNAP_ID)); - UUID prevSnapId = UUID.fromString((String) nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID)); + final String snapIdStr = (String) nodes.get(OzoneConsts.OM_SLD_SNAP_ID); + UUID snapId = UUID.fromString(snapIdStr); + final String prevSnapIdStr = (String) nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID); + UUID prevSnapId = prevSnapIdStr != null ? UUID.fromString(prevSnapIdStr) : null; OmSnapshotLocalDataYaml snapshotLocalData = new OmSnapshotLocalDataYaml(snapId, Collections.emptyList(), prevSnapId); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java new file mode 100644 index 000000000000..33a03d321816 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java @@ -0,0 +1,1118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; +import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background service for defragmenting snapshots in the active snapshot chain. + * When snapshots are taken, they capture the entire OM RocksDB state but may contain + * fragmented data. This service defragments snapshots by creating new compacted + * RocksDB instances with only the necessary data for tracked column families. + *

+ * The service processes snapshots in the active chain sequentially, starting with + * the first non-defragmented snapshot. For the first snapshot in the chain, it + * performs a full defragmentation by copying all keys. For subsequent snapshots, + * it uses incremental defragmentation based on diffs from the previous defragmented + * snapshot. + */ +public class SnapshotDefragService extends BackgroundService + implements BootstrapStateHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SnapshotDefragService.class); + + // Use only a single thread for snapshot defragmentation to avoid conflicts + private static final int DEFRAG_CORE_POOL_SIZE = 1; + + private static final String CHECKPOINT_STATE_DEFRAGGED_DIR = OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; + private static final String TEMP_DIFF_DIR = "tempDiffSstFiles"; // TODO: Put this in OzoneConsts? + + private final OzoneManager ozoneManager; + private SnapshotChainManager snapshotChainManager; + private final AtomicLong runCount = new AtomicLong(0); + + // Number of snapshots to be processed in a single iteration + private final long snapshotLimitPerTask; + + private final AtomicLong snapshotsDefraggedCount; + private final AtomicBoolean running; + + private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); + + public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, + OzoneManager ozoneManager, OzoneConfiguration configuration) { + super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, + serviceTimeout, ozoneManager.getThreadNamePrefix()); + this.ozoneManager = ozoneManager; + this.snapshotLimitPerTask = configuration + .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, + SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT); + snapshotsDefraggedCount = new AtomicLong(0); + running = new AtomicBoolean(false); + } + + @Override + public void start() { + running.set(true); + super.start(); + } + + @VisibleForTesting + public void pause() { + running.set(false); + } + + @VisibleForTesting + public void resume() { + running.set(true); + } + + /** + * Checks if rocks-tools native library is available. + */ + private boolean isRocksToolsNativeLibAvailable() { + try { + return ManagedRawSSTFileReader.tryLoadLibrary(); + } catch (Exception e) { + LOG.warn("Failed to check native code availability", e); + return false; + } + } + + /** + * Checks if a snapshot needs defragmentation by examining its YAML metadata. + */ + private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + try { + // Read YAML metadata using the correct API + File yamlFile = new File(snapshotPath + ".yaml"); + OmSnapshotLocalDataYaml yamlData = + OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(), yamlFile); + + // Check if snapshot needs compaction (defragmentation) + boolean needsDefrag = yamlData.getNeedsDefrag(); + LOG.debug("Snapshot {} needsDefragmentation field value: {}", + snapshotInfo.getName(), needsDefrag); + + return needsDefrag; + } catch (IOException e) { + LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed", + snapshotInfo.getName(), e); + return true; + } + } + + /** + * Finds the first active snapshot in the chain that needs defragmentation. + */ + private SnapshotInfo findFirstSnapshotNeedingDefrag( + Table snapshotInfoTable) throws IOException { + + LOG.debug("Searching for first snapshot needing defragmentation in active chain"); + + // Use iterator(false) to iterate forward through the snapshot chain + Iterator snapshotIterator = snapshotChainManager.iterator(false); + + while (snapshotIterator.hasNext()) { + UUID snapshotId = snapshotIterator.next(); + String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId); + SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey); + + if (snapshotInfo == null) { + LOG.warn("Snapshot with ID '{}' not found in snapshot info table", snapshotId); + continue; + } + + // Skip deleted snapshots + if (snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) { + LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName()); + continue; + } + + // Check if this snapshot needs defragmentation + if (needsDefragmentation(snapshotInfo)) { + LOG.info("Found snapshot needing defragmentation: {} (ID: {})", + snapshotInfo.getName(), snapshotInfo.getSnapshotId()); + return snapshotInfo; + } + + LOG.debug("Snapshot {} already defragmented, continuing search", + snapshotInfo.getName()); + } + + LOG.debug("No snapshots found needing defragmentation"); + return null; + } + + /** + * Performs full defragmentation for the first snapshot in the chain. + * This is a simplified implementation that demonstrates the concept. + */ + private void performFullDefragmentation(SnapshotInfo snapshotInfo, + OmSnapshot omSnapshot) throws IOException { + + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + // For defraggedDbPath, we need to go up to the parent directory and use checkpointStateDefragged + String parentDir = Paths.get(snapshotPath).getParent().getParent().toString(); + String checkpointDirName = Paths.get(snapshotPath).getFileName().toString(); + String defraggedDbPath = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, checkpointDirName).toString(); + + LOG.info("Starting full defragmentation for snapshot: {} at path: {}", + snapshotInfo.getName(), snapshotPath); + LOG.info("Target defragmented DB path: {}", defraggedDbPath); + + // Create defragmented directory + Files.createDirectories(Paths.get(defraggedDbPath)); + + // TODO: Get snapshot checkpoint DB via SnapshotCache + RDBStore originalStore = (RDBStore) omSnapshot.getMetadataManager().getStore(); + RocksDatabase originalDb = originalStore.getDb(); + + LOG.info("Starting defragmentation process for snapshot: {}", snapshotInfo.getName()); + LOG.info("Original DB path: {}", snapshotPath); + LOG.info("Defragmented DB path: {}", defraggedDbPath); + + // Implement actual RocksDB defragmentation + try { + // 1. Create a new RocksDB instance at defraggedDbPath + DBStoreBuilder defraggedDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) + .setName(checkpointDirName) + .setPath(Paths.get(defraggedDbPath).getParent()) + .setCreateCheckpointDirs(false); + + // Add all the tracked column families to the new DB + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + defraggedDbBuilder.addTable(cfName); + LOG.debug("Added column family {} to defragmented DB", cfName); + } + + // Build the new defragmented database + DBStore defraggedStore = defraggedDbBuilder.build(); + RocksDatabase defraggedDb = ((RDBStore) defraggedStore).getDb(); + + LOG.info("Created new defragmented DB instance for snapshot: {}", snapshotInfo.getName()); + + // 2. & 3. Iterate through tracked column families and copy all key-value pairs + long totalKeysCopied = 0; + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + LOG.info("Starting defragmentation of column family: {} for snapshot: {}", + cfName, snapshotInfo.getName()); + + RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName); + + if (originalCf == null) { + LOG.warn("Column family {} not found in original DB, skipping", cfName); + continue; + } + + if (defraggedCf == null) { + LOG.warn("Column family {} not found in defragmented DB, skipping", cfName); + continue; + } + + long cfKeysCopied = 0; + try (ManagedWriteBatch writeBatch = new ManagedWriteBatch(); + ManagedRocksIterator iterator = originalDb.newIterator(originalCf)) { + + iterator.get().seekToFirst(); + + while (iterator.get().isValid()) { + byte[] key = iterator.get().key(); + byte[] value = iterator.get().value(); + + // Add to batch for efficient writing + defraggedCf.batchPut(writeBatch, key, value); + cfKeysCopied++; + + // Commit batch every 1000 keys to avoid memory issues + if (cfKeysCopied % 1000 == 0) { + defraggedDb.batchWrite(writeBatch); + writeBatch.clear(); + LOG.debug("Copied {} keys for column family {} so far", cfKeysCopied, cfName); + } + + iterator.get().next(); + } + + // Commit any remaining keys in the batch + if (writeBatch.count() > 0) { + defraggedDb.batchWrite(writeBatch); + } + + totalKeysCopied += cfKeysCopied; + LOG.info("Completed copying {} keys for column family: {} in snapshot: {}", + cfKeysCopied, cfName, snapshotInfo.getName()); + } + } + + LOG.info("Copied total of {} keys across all column families for snapshot: {}", + totalKeysCopied, snapshotInfo.getName()); + + // 4. Perform compaction on the new DB to ensure it's fully defragmented + LOG.info("Starting compaction of defragmented DB for snapshot: {}", snapshotInfo.getName()); + try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) { + compactOptions.setChangeLevel(true); + compactOptions.setTargetLevel(1); + defraggedDb.compactDB(compactOptions); + } + LOG.info("Completed compaction of defragmented DB for snapshot: {}", snapshotInfo.getName()); + + // 5. Verify data integrity between original and defragmented DBs + verifyDbIntegrity(originalDb, defraggedDb, snapshotInfo); + + // Close the defragmented DB + defraggedStore.close(); + + LOG.info("Successfully completed full defragmentation for snapshot: {} with {} keys copied", + snapshotInfo.getName(), totalKeysCopied); + + } catch (RocksDatabaseException e) { + LOG.error("RocksDB error during defragmentation of snapshot: {}", snapshotInfo.getName(), e); + throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e); + } catch (Exception e) { + LOG.error("Unexpected error during defragmentation of snapshot: {}", snapshotInfo.getName(), e); + throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e); + } + } + + /** + * Performs incremental defragmentation using diff from previous defragmented snapshot. + */ + private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot, + SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot) + throws IOException { + + String currentSnapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), currentSnapshot); + String previousSnapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), previousDefraggedSnapshot); + + // Fix path construction similar to performFullDefragmentation + String previousParentDir = Paths.get(previousSnapshotPath).getParent().getParent().toString(); + String previousCheckpointDirName = Paths.get(previousSnapshotPath).getFileName().toString(); + String previousDefraggedDbPath = Paths.get(previousParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, + previousCheckpointDirName).toString(); + + String currentParentDir = Paths.get(currentSnapshotPath).getParent().getParent().toString(); + String currentCheckpointDirName = Paths.get(currentSnapshotPath).getFileName().toString(); + String currentDefraggedDbPath = Paths.get(currentParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, + currentCheckpointDirName).toString(); + + LOG.info("Starting incremental defragmentation for snapshot: {} using previous: {}", + currentSnapshot.getName(), previousDefraggedSnapshot.getName()); + LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath); + LOG.info("Current target DB: {}", currentDefraggedDbPath); + + // Note: Don't create target directory - RocksDB createCheckpoint() will create it + // and will fail with "Directory exists" if we create it first + + try { + // Check if previous defragmented DB exists + if (!Files.exists(Paths.get(previousDefraggedDbPath))) { + LOG.warn("Previous defragmented DB not found at {}, falling back to full defragmentation", + previousDefraggedDbPath); + performFullDefragmentation(currentSnapshot, currentOmSnapshot); + return; + } + + // 1. Create a checkpoint from the previous defragmented DB directly at target location + LOG.info("Creating checkpoint from previous defragmented DB directly to target location"); + + // Open the previous defragmented DB to create checkpoint. + // TODO: via SnapshotCache or something equivalent for lock protection + DBStoreBuilder previousDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) + .setName(previousCheckpointDirName) + .setPath(Paths.get(previousDefraggedDbPath).getParent()) + .setOpenReadOnly(true) + .setCreateCheckpointDirs(false); + + // Add tracked column families + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + previousDbBuilder.addTable(cfName); + } + + try (DBStore previousDefraggedStore = previousDbBuilder.build()) { + RocksDatabase previousDefraggedDb = ((RDBStore) previousDefraggedStore).getDb(); + + // Create checkpoint directly at the target location + try (RocksDatabase.RocksCheckpoint checkpoint = previousDefraggedDb.createCheckpoint()) { + checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath)); + LOG.info("Created checkpoint directly at target: {}", currentDefraggedDbPath); + } + } + + // 2. Open the checkpoint as our working defragmented DB and apply incremental changes + DBStoreBuilder currentDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) + .setName(currentCheckpointDirName) + .setPath(Paths.get(currentDefraggedDbPath).getParent()) + .setCreateCheckpointDirs(false); + + // Add tracked column families + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + currentDbBuilder.addTable(cfName); + } + + // Build DB from the checkpoint + DBStore currentDefraggedStore = currentDbBuilder.build(); + RocksDatabase currentDefraggedDb = ((RDBStore) currentDefraggedStore).getDb(); + + LOG.info("Opened checkpoint as working defragmented DB for incremental update"); + + // 3. Apply incremental changes from current snapshot + RDBStore currentSnapshotStore = (RDBStore) currentOmSnapshot.getMetadataManager().getStore(); + RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb(); + + long incrementalKeysCopied = applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore, + currentSnapshot, previousDefraggedSnapshot); + + LOG.info("Applied {} incremental changes for snapshot: {}", + incrementalKeysCopied, currentSnapshot.getName()); + + // 4. Perform compaction on the updated DB +// LOG.info("Starting compaction of incrementally defragmented DB for snapshot: {}", +// currentSnapshot.getName()); +// try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) { +// compactOptions.setChangeLevel(true); +// compactOptions.setTargetLevel(1); +// currentDefraggedDb.compactDB(compactOptions); +// } +// LOG.info("Completed compaction of incrementally defragmented DB"); + + // 5. Verify data integrity + verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb, currentSnapshot); + + // Close the defragmented DB. TODO: Close in finally block instead + currentDefraggedStore.close(); + + LOG.info("Successfully completed incremental defragmentation for snapshot: {} with {} incremental changes", + currentSnapshot.getName(), incrementalKeysCopied); + + } catch (RocksDatabaseException e) { + LOG.error("RocksDB error during incremental defragmentation of snapshot: {}", + currentSnapshot.getName(), e); +// LOG.warn("Falling back to full defragmentation due to error"); +// performFullDefragmentation(currentSnapshot, currentOmSnapshot); + } catch (Exception e) { + LOG.error("Unexpected error during incremental defragmentation of snapshot: {}", + currentSnapshot.getName(), e); + LOG.warn("Falling back to full defragmentation due to error"); + performFullDefragmentation(currentSnapshot, currentOmSnapshot); + } + } + + /** + * Applies incremental changes by using snapshotDiff to compute the diff list, + * then iterating that diff list against the current snapshot checkpoint DB. + * Uses RDBSstFileWriter to directly write changes to SST files and then ingests them. + */ + @SuppressWarnings("checkstyle:MethodLength") + private long applyIncrementalChanges(RocksDatabase currentSnapshotDb, DBStore targetStore, + SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws RocksDatabaseException { + + LOG.info("Applying incremental changes for snapshot: {} since: {} using snapshotDiff approach", + currentSnapshot.getName(), previousSnapshot.getName()); + + long totalChanges = 0; + + // Create temporary directory for SST files + String currentSnapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), currentSnapshot); + String parentDir = Paths.get(currentSnapshotPath).getParent().getParent().toString(); + String tempSstDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, TEMP_DIFF_DIR).toString(); + + try { + Files.createDirectories(Paths.get(tempSstDir)); + LOG.info("Created temporary SST directory: {}", tempSstDir); + + // Use snapshotDiff to compute the diff list between previous and current snapshot + LOG.info("Computing snapshot diff between {} and {}", + previousSnapshot.getName(), currentSnapshot.getName()); + + SnapshotDiffResponse diffResponse; + try { + // Call snapshotDiff to get the diff list + diffResponse = ozoneManager.snapshotDiff( + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName(), + previousSnapshot.getName(), + currentSnapshot.getName(), + null, // token - start from beginning + 0, // pageSize - get all diffs at once + false, // forceFullDiff + false // disableNativeDiff + ); + + // Wait for snapshotDiff computation to complete if it's still in progress + while (diffResponse.getJobStatus() == SnapshotDiffResponse.JobStatus.IN_PROGRESS || + diffResponse.getJobStatus() == SnapshotDiffResponse.JobStatus.QUEUED) { + LOG.info("Snapshot diff computation in progress, waiting {} ms...", + diffResponse.getWaitTimeInMs()); + // TODO: This can be improved by triggering snapdiff first, before any locks are grabbed, + // so that we don't have to wait here + Thread.sleep(diffResponse.getWaitTimeInMs()); + + // Poll for updated status + diffResponse = ozoneManager.snapshotDiff( + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName(), + previousSnapshot.getName(), + currentSnapshot.getName(), + null, // token + 0, // pageSize + false, // forceFullDiff + false // disableNativeDiff + ); + } + + if (diffResponse.getJobStatus() != SnapshotDiffResponse.JobStatus.DONE) { + throw new RocksDatabaseException("Snapshot diff computation failed with status: " + + diffResponse.getJobStatus() + ", reason: " + diffResponse.getReason()); + } + + LOG.info("Snapshot diff computation completed successfully"); + + } catch (Exception e) { + throw new RocksDatabaseException("Failed to compute snapshot diff", e); + } + + SnapshotDiffReportOzone diffReport = diffResponse.getSnapshotDiffReport(); + if (diffReport == null || diffReport.getDiffList() == null) { + LOG.info("No differences found between snapshots, no changes to apply"); + return 0; + } + + // TODO: Handle pagination when diffList is bigger than server page size + // 2025-08-16 09:10:52,500 [IPC Server handler 1 on default port 9862] INFO + // om.SnapshotDefragService: Found 1000 differences to process + LOG.info("Found {} differences to process", diffReport.getDiffList().size()); + + // Get table references for target database + RDBStore targetRdbStore = (RDBStore) targetStore; + RocksDatabase targetDb = targetRdbStore.getDb(); + + int nextToken = 0; + while (diffReport.getDiffList() != null && !diffReport.getDiffList().isEmpty()) { + final List diffList = diffReport.getDiffList(); + + // Group diff entries by column family and process each CF separately + // TODO: Use bucket layout to determine which column families to process + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + RocksDatabase.ColumnFamily currentCf = currentSnapshotDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily targetCf = targetDb.getColumnFamily(cfName); + + if (currentCf == null || targetCf == null) { + LOG.warn("Column family {} not found, skipping incremental changes", cfName); + continue; + } + + Table targetTable = targetRdbStore.getTable(cfName); + if (targetTable == null) { + LOG.warn("Table {} not found in target store, skipping", cfName); + continue; + } + + long cfChanges = 0; + String sstFileName = cfName + "_" + currentSnapshot.getSnapshotId() + ".sst"; + File sstFile = new File(tempSstDir, sstFileName); + + LOG.debug("Creating SST file for column family {} changes: {}", cfName, sstFile.getAbsolutePath()); + + // Use RDBSstFileWriter to write changes to SST file + try (RDBSstFileWriter sstWriter = new RDBSstFileWriter(sstFile)) { + + // Iterate through the diff list and process each entry + for (DiffReportEntry diffEntry : diffList) { + String sourcePath = new String(diffEntry.getSourcePath(), StandardCharsets.UTF_8); + + // Extract the key from the path using volume and bucket from snapshot context + byte[] key = extractKeyFromPath(sourcePath, cfName, + currentSnapshot.getVolumeName(), currentSnapshot.getBucketName()); + if (key == null) { + continue; // Skip if this entry doesn't belong to current column family + } + + DiffType diffType = diffEntry.getType(); + + switch (diffType) { + case CREATE: + case MODIFY: + // Key was created or modified - get current value and write to SST + byte[] currentValue = currentSnapshotDb.get(currentCf, key); + if (currentValue != null) { + sstWriter.put(key, currentValue); + cfChanges++; + + if (cfChanges % 10000 == 0) { + LOG.debug("Written {} changes to SST file for column family {} so far", + cfChanges, cfName); + } + } + break; + + case DELETE: + // Key was deleted - write tombstone to SST + /* TODO: Sort keys before writing to SST file? +Caused by: org.rocksdb.RocksDBException: Keys must be added in strict ascending order. +at org.rocksdb.SstFileWriter.delete(Native Method) +at org.rocksdb.SstFileWriter.delete(SstFileWriter.java:178) +at org.apache.hadoop.hdds.utils.db.RDBSstFileWriter.delete(RDBSstFileWriter.java:65) + * */ + sstWriter.delete(key); + cfChanges++; + + if (cfChanges % 10000 == 0) { + LOG.debug("Written {} changes (including deletions) to SST file for column family {} so far", + cfChanges, cfName); + } + break; + + case RENAME: + // Handle rename - delete old key and create new key + if (diffEntry.getTargetPath() != null) { + String targetPath = new String(diffEntry.getTargetPath(), StandardCharsets.UTF_8); + byte[] newKey = extractKeyFromPath(targetPath, cfName, + currentSnapshot.getVolumeName(), currentSnapshot.getBucketName()); + + if (newKey != null) { + // Delete old key + sstWriter.delete(key); + + // Add new key with current value + byte[] newValue = currentSnapshotDb.get(currentCf, newKey); + if (newValue != null) { + sstWriter.put(newKey, newValue); + } + cfChanges += 2; // Count both delete and put + } + } + break; + + default: + LOG.warn("Unknown diff type: {}, skipping entry", diffType); + break; + } + } + + LOG.debug("Finished writing {} changes for column family: {} to SST file", + cfChanges, cfName); + + } catch (Exception e) { + LOG.error("Error processing column family {} for snapshot {}: {}", + cfName, currentSnapshot.getName(), e.getMessage(), e); + } + + // Ingest SST file into target database if there were changes + if (cfChanges > 0 && sstFile.exists() && sstFile.length() > 0) { + try { + targetTable.loadFromFile(sstFile); + LOG.info("Successfully ingested SST file for column family {}: {} changes", + cfName, cfChanges); + } catch (Exception e) { + LOG.error("Failed to ingest SST file for column family {}: {}", cfName, e.getMessage(), e); + } + } else { + LOG.debug("No changes to ingest for column family {}", cfName); + } + + // Clean up SST file after ingestion + try { + if (sstFile.exists()) { + Files.delete(sstFile.toPath()); + LOG.debug("Cleaned up SST file: {}", sstFile.getAbsolutePath()); + } + } catch (IOException e) { + LOG.warn("Failed to clean up SST file: {}", sstFile.getAbsolutePath(), e); + } + + totalChanges += cfChanges; + LOG.debug("Applied {} incremental changes for column family: {}", cfChanges, cfName); + } + + +// String lastToken = new String(diffList.get(diffList.size() - 1).getSourcePath(), StandardCharsets.UTF_8); + nextToken += diffList.size(); + LOG.debug("Retrieving next page of snapshot diff with token: {}", nextToken); + diffResponse = ozoneManager.snapshotDiff( + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName(), + previousSnapshot.getName(), + currentSnapshot.getName(), + String.valueOf(nextToken), // token + 0, // pageSize + false, // forceFullDiff + false // disableNativeDiff + ); + + if (diffResponse.getJobStatus() != SnapshotDiffResponse.JobStatus.DONE) { + throw new RocksDatabaseException("Expecting DONE but got unexpected snapshot diff status: " + + diffResponse.getJobStatus() + ", reason: " + diffResponse.getReason()); + } + + LOG.info("Retrieved next page of snapshot diff, size: {}", + diffResponse.getSnapshotDiffReport().getDiffList().size()); + diffReport = diffResponse.getSnapshotDiffReport(); + } + + // Clean up temporary directory + try { + Files.deleteIfExists(Paths.get(tempSstDir)); + LOG.debug("Cleaned up temporary SST directory: {}", tempSstDir); + } catch (IOException e) { + LOG.warn("Failed to clean up temporary SST directory: {}", tempSstDir, e); + } + + } catch (IOException e) { + throw new RocksDatabaseException("Failed to create temporary SST directory: " + tempSstDir, e); + } + + LOG.info("Applied {} total incremental changes using snapshotDiff approach", totalChanges); + return totalChanges; + } + + /** + * Extracts the database key from a diff report path for a specific column family. + * This method converts paths from snapshot diff reports into database keys. + */ + private byte[] extractKeyFromPath(String path, String columnFamily, String volume, String bucket) { + try { + if (KEY_TABLE.equals(columnFamily)) { + // For keyTable, use OmMetadataManagerImpl#getOzoneKey + // Path in diff report contains just the key part (after volume/bucket) + String dbKey = ozoneManager.getMetadataManager().getOzoneKey(volume, bucket, path); + return dbKey.getBytes(StandardCharsets.UTF_8); + } else if (FILE_TABLE.equals(columnFamily)) { // TODO: FSO code path not tested + // For fileTable, use OmMetadataManagerImpl#getOzoneKeyFSO + // Path in diff report contains just the key part (after volume/bucket) + String dbKey = ozoneManager.getMetadataManager().getOzoneKeyFSO(volume, bucket, path); + return dbKey.getBytes(StandardCharsets.UTF_8); + } + + // If we can't extract a valid key for this column family, return null + // This will cause the entry to be skipped for this column family + return null; + + } catch (Exception e) { + LOG.warn("Failed to extract key from path: {} for column family: {}, volume: {}, bucket: {}, error: {}", + path, columnFamily, volume, bucket, e.getMessage(), e); + return null; + } + } + + /** + * Verifies DB integrity by comparing key counts and spot-checking keys/values + * between the original and defragmented databases. + */ + private void verifyDbIntegrity(RocksDatabase originalDb, RocksDatabase defraggedDb, + SnapshotInfo snapshotInfo) { + + LOG.info("Starting DB integrity verification for snapshot: {}", snapshotInfo.getName()); + + boolean verificationPassed = true; + long totalOriginalKeys = 0; + long totalDefraggedKeys = 0; + + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + LOG.debug("Verifying column family: {} for snapshot: {}", + cfName, snapshotInfo.getName()); + + RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName); + + if (originalCf == null) { + LOG.warn("Column family {} not found in original DB, skipping verification", cfName); + continue; + } + + if (defraggedCf == null) { + LOG.error("Column family {} not found in defragmented DB, verification failed", cfName); + verificationPassed = false; + continue; + } + + try { + // Count keys in original DB + long originalKeyCount = 0; + try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) { + originalIterator.get().seekToFirst(); + while (originalIterator.get().isValid()) { + originalKeyCount++; + originalIterator.get().next(); + } + } + + // Count keys in defragmented DB + long defraggedKeyCount = 0; + try (ManagedRocksIterator defraggedIterator = defraggedDb.newIterator(defraggedCf)) { + defraggedIterator.get().seekToFirst(); + while (defraggedIterator.get().isValid()) { + defraggedKeyCount++; + defraggedIterator.get().next(); + } + } + + totalOriginalKeys += originalKeyCount; + totalDefraggedKeys += defraggedKeyCount; + + // Verify key counts match + if (originalKeyCount != defraggedKeyCount) { + LOG.error("Key count mismatch for column family {}: original={}, defragmented={}", + cfName, originalKeyCount, defraggedKeyCount); + verificationPassed = false; + } else { + LOG.info("Key count verification passed for column family {}: {} keys", + cfName, originalKeyCount); + } + + // Full verification - check every single key-value pair + long fullCheckCount = 0; + long fullCheckErrors = 0; + + try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) { + originalIterator.get().seekToFirst(); + + while (originalIterator.get().isValid()) { + byte[] originalKey = originalIterator.get().key(); + byte[] originalValue = originalIterator.get().value(); + + // Get the same key from defragmented DB + byte[] defraggedValue = defraggedDb.get(defraggedCf, originalKey); + + if (defraggedValue == null) { + LOG.error("Key missing in defragmented DB for column family {}: key #{}", + cfName, fullCheckCount); + verificationPassed = false; + fullCheckErrors++; + } else if (!java.util.Arrays.equals(originalValue, defraggedValue)) { + LOG.error("Value mismatch for column family {}: key #{}", + cfName, fullCheckCount); + verificationPassed = false; + fullCheckErrors++; + } + + fullCheckCount++; + + // Log progress every 10,000 keys to avoid log spam + if (fullCheckCount % 10000 == 0) { + LOG.debug("Full verification progress for column family {}: checked {} keys, {} errors so far", + cfName, fullCheckCount, fullCheckErrors); + } + + if (fullCheckErrors > 10) { + LOG.warn("Too many errors found during full verification for column family {}, stopping further checks", + cfName); + break; // Stop if too many errors to avoid flooding logs + } + + originalIterator.get().next(); + } + } + + if (fullCheckErrors == 0) { + LOG.info("Full verification PASSED for column family {}: all {} keys verified successfully", + cfName, fullCheckCount); + } else { + LOG.error("Full verification FAILED for column family {}: {} errors found out of {} keys checked", + cfName, fullCheckErrors, fullCheckCount); + } + + } catch (Exception e) { + LOG.error("Error during verification of column family {} for snapshot {}: {}", + cfName, snapshotInfo.getName(), e.getMessage(), e); + verificationPassed = false; + } + } + + // Log final verification results + if (verificationPassed) { + LOG.info("DB integrity verification PASSED for snapshot: {} " + + "(total original keys: {}, total defragmented keys: {})", + snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys); + } else { + LOG.error("DB integrity verification FAILED for snapshot: {} " + + "(total original keys: {}, total defragmented keys: {})", + snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys); + // Consider throwing an exception here if verification failure should halt the process + // throw new IOException("DB integrity verification failed for snapshot: " + snapshotInfo.getName()); + } + } + + /** + * Updates snapshot metadata to point to the new defragmented DB location. + */ + private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws IOException { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + LOG.info("Updating snapshot metadata for: {} at path: {}", + snapshotInfo.getName(), snapshotPath); + + try { + // Read current YAML data using the correct API + File yamlFile = new File(snapshotPath + ".yaml"); + OmSnapshotLocalDataYaml yamlData = + OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(), yamlFile); + + // Mark as defragmented by setting needsCompaction to false + yamlData.setNeedsDefrag(false); + + // Write updated YAML data + yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile); + + LOG.info("Successfully updated metadata for snapshot: {}, " + + "marked as defragmented (needsCompaction=false)", + snapshotInfo.getName()); + + } catch (IOException e) { + LOG.error("Failed to update metadata for snapshot: {}", snapshotInfo.getName(), e); + throw e; + } + } + + private final class SnapshotDefragTask implements BackgroundTask { + + @Override + public BackgroundTaskResult call() throws Exception { + // Check OM leader and readiness + if (shouldRun()) { + final long count = runCount.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating Snapshot Defragmentation Task: run # {}", count); + } + triggerSnapshotDefragOnce(); + } + + return EmptyTaskResult.newResult(); + } + } + + public synchronized boolean triggerSnapshotDefragOnce() throws IOException { + // Check if rocks-tools native lib is available + if (!isRocksToolsNativeLibAvailable()) { + LOG.warn("Rocks-tools native library is not available. " + + "Stopping SnapshotDefragService."); + return false; + } + + Optional snapshotManager = Optional.ofNullable(ozoneManager) + .map(OzoneManager::getOmSnapshotManager); + if (!snapshotManager.isPresent()) { + LOG.debug("OmSnapshotManager not available, skipping defragmentation task"); + return false; + } + + // Get the SnapshotChainManager to iterate through the global snapshot chain + // Set this each time the task runs just in case OmMetadataManager is restarted + snapshotChainManager = ((OmMetadataManagerImpl) ozoneManager.getMetadataManager()).getSnapshotChainManager(); + + final Table snapshotInfoTable = + ozoneManager.getMetadataManager().getSnapshotInfoTable(); + + long snapshotLimit = snapshotLimitPerTask; + + while (snapshotLimit > 0 && running.get()) { + // Find the first snapshot needing defragmentation + SnapshotInfo snapshotToDefrag = findFirstSnapshotNeedingDefrag(snapshotInfoTable); + + if (snapshotToDefrag == null) { + LOG.info("No snapshots found needing defragmentation"); + break; + } + + // Acquire SNAPSHOT_GC_LOCK + OMLockDetails gcLockDetails = ozoneManager.getMetadataManager().getLock() + .acquireWriteLock(SNAPSHOT_GC_LOCK, snapshotToDefrag.getSnapshotId().toString()); + LOG.debug("Acquired SNAPSHOT_GC_LOCK for snapshot: {}, ID: {}", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + if (!gcLockDetails.isLockAcquired()) { + LOG.warn("Failed to acquire SNAPSHOT_GC_LOCK for snapshot: {}", + snapshotToDefrag.getName()); + break; + } + + try { + LOG.info("Processing snapshot defragmentation for: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + // Get snapshot through SnapshotCache for proper locking + try (UncheckedAutoCloseableSupplier snapshotSupplier = + snapshotManager.get().getActiveSnapshot( + snapshotToDefrag.getVolumeName(), + snapshotToDefrag.getBucketName(), + snapshotToDefrag.getName())) { + + OmSnapshot omSnapshot = snapshotSupplier.get(); + + UUID pathPreviousSnapshotId = snapshotToDefrag.getPathPreviousSnapshotId(); + boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null; + if (isFirstSnapshotInPath) { + LOG.info("Performing full defragmentation for first snapshot (in path): {}", + snapshotToDefrag.getName()); + performFullDefragmentation(snapshotToDefrag, omSnapshot); + } else { + final String psIdtableKey = snapshotChainManager.getTableKey(pathPreviousSnapshotId); + SnapshotInfo previousDefraggedSnapshot = snapshotInfoTable.get(psIdtableKey); + + LOG.info("Performing incremental defragmentation for snapshot: {} " + + "based on previous defragmented snapshot: {}", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + + // If previous path snapshot is not null, it must have been defragmented already + // Sanity check to ensure previous snapshot exists and is defragmented + if (needsDefragmentation(previousDefraggedSnapshot)) { + LOG.error("Fatal error before defragging snapshot: {}. " + + "Previous snapshot in path {} was not defragged while it is expected to be.", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + break; + } + + performIncrementalDefragmentation(snapshotToDefrag, + previousDefraggedSnapshot, omSnapshot); + } + + // Update snapshot metadata + updateSnapshotMetadata(snapshotToDefrag); + + // Close and evict the original snapshot DB from SnapshotCache + // TODO: Implement proper eviction from SnapshotCache + LOG.info("Defragmentation completed for snapshot: {}", + snapshotToDefrag.getName()); + + snapshotLimit--; + snapshotsDefraggedCount.getAndIncrement(); + + } catch (OMException ome) { + if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) { + LOG.info("Snapshot {} was deleted during defragmentation", + snapshotToDefrag.getName()); + } else { + LOG.error("OMException during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), ome); + } + } + + } catch (Exception e) { + LOG.error("Exception during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), e); + return false; + } finally { + // Release SNAPSHOT_GC_LOCK + ozoneManager.getMetadataManager().getLock() + .releaseWriteLock(SNAPSHOT_GC_LOCK, snapshotToDefrag.getSnapshotId().toString()); + LOG.debug("Released SNAPSHOT_GC_LOCK for snapshot: {}, ID: {}", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + } + } + + return true; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new SnapshotDefragTask()); + return queue; + } + + /** + * Returns true if the service run conditions are satisfied, false otherwise. + */ + private boolean shouldRun() { + if (ozoneManager == null) { + // OzoneManager can be null for testing + return true; + } + if (ozoneManager.getOmRatisServer() == null) { + LOG.warn("OzoneManagerRatisServer is not initialized yet"); + return false; + } + // The service only runs if current OM node is ready + return running.get() && ozoneManager.isRunning(); + } + + public AtomicLong getSnapshotsDefraggedCount() { + return snapshotsDefraggedCount; + } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } + + @Override + public void shutdown() { + running.set(false); + super.shutdown(); + } +} +