diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index aecbdfae615d..03250b88e0e0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -521,9 +521,13 @@ public final class OzoneConsts { public static final String OM_SNAPSHOT_DIR = "db.snapshots"; public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "checkpointState"; - public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR = "checkpointStateDefragged"; + public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR = OM_SNAPSHOT_DIR + + OM_KEY_PREFIX + "checkpointStateDefragged"; + public static final String TEMP_DIFF_SST_FILES_DIR = OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR + + OM_KEY_PREFIX + "tempDiffSstFiles"; public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "diffState"; + public static final String SNAPSHOT_DEFRAG_VERSION_SUFFIX_PREFIX = "-v"; public static final String OM_SNAPSHOT_INDICATOR = ".snapshot"; public static final String OM_SNAPSHOT_DIFF_DB_NAME = "db.snapdiff"; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index e3853a84211c..d9a234b1009f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR; import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_INFO_TABLE; @@ -66,6 +67,7 @@ public class RDBStore implements DBStore { private final RDBCheckpointManager checkPointManager; private final String checkpointsParentDir; private final String snapshotsParentDir; + private final String defraggedSnapshotsParentDir; private final RDBMetrics rdbMetrics; private final RocksDBCheckpointDiffer rocksDBCheckpointDiffer; @@ -133,6 +135,7 @@ public class RDBStore implements DBStore { if (!createCheckpointDirs) { checkpointsParentDir = null; snapshotsParentDir = null; + defraggedSnapshotsParentDir = null; } else { Path checkpointsParentDirPath = Paths.get(dbLocation.getParent(), OM_CHECKPOINT_DIR); @@ -143,6 +146,12 @@ public class RDBStore implements DBStore { Paths.get(dbLocation.getParent(), OM_SNAPSHOT_CHECKPOINT_DIR); snapshotsParentDir = snapshotsParentDirPath.toString(); Files.createDirectories(snapshotsParentDirPath); + + // TODO: Put this behind a feature flag + Path defraggedSnapshotsParentDirPath = + Paths.get(dbLocation.getParent(), OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR); + defraggedSnapshotsParentDir = defraggedSnapshotsParentDirPath.toString(); + Files.createDirectories(defraggedSnapshotsParentDirPath); } if (enableCompactionDag) { @@ -193,6 +202,10 @@ public String getSnapshotsParentDir() { return snapshotsParentDir; } + public String getDefraggedSnapshotsParentDir() { + return defraggedSnapshotsParentDir; + } + @Override public RocksDBCheckpointDiffer getRocksDBCheckpointDiffer() { return rocksDBCheckpointDiffer; diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index b93626060c80..769e47214afe 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -242,7 +242,7 @@ public boolean isClosed() { * * @see ManagedCheckpoint */ - final class RocksCheckpoint implements Closeable { + public final class RocksCheckpoint implements Closeable { private final ManagedCheckpoint checkpoint; private RocksCheckpoint() { @@ -614,7 +614,7 @@ public List getLiveFilesMetaData() throws RocksDatabaseExcepti } } - RocksCheckpoint createCheckpoint() { + public RocksCheckpoint createCheckpoint() { return new RocksCheckpoint(); } @@ -665,7 +665,7 @@ public Collection getExtraColumnFamilies() { return Collections.unmodifiableCollection(columnFamilies.values()); } - byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException { + public byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { return db.get().get(family.getHandle(), key); } catch (RocksDBException e) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index 3d542785e113..5778efe94e8a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MULTIPART_FORM_DATA_BOUNDARY; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA; @@ -497,6 +498,9 @@ private void testWriteDbDataToStream() throws Exception { Set initialFullSet = getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), metaDirLength); + // TODO: OM bootstrapping is not aware of defrag dir yet. Remove the workaround workaround when implemented + initialFullSet.remove(OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR); + assertThat(finalFullSet).contains(expectedLogStr); assertThat(finalFullSet).contains(expectedSstStr); assertEquals(initialFullSet, finalFullSet, "expected snapshot files not found"); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService.java new file mode 100644 index 000000000000..0c8a903cacdd --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.ozone.test.LambdaTestUtils.await; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.DBProfile; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMStorage; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.SnapshotDefragService; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * Test SnapshotDefragService functionality using MiniOzoneCluster. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class TestSnapshotDefragService { + + private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotDefragService.class); + + private MiniOzoneCluster cluster; + private OzoneClient client; + private ObjectStore store; + private OzoneManager ozoneManager; + private SnapshotDefragService defragService; + private static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_VALUE = 3; + + @BeforeAll + void setup() throws Exception { + // Enable debug logging for SnapshotDefragService + GenericTestUtils.setLogLevel(LoggerFactory.getLogger(SnapshotDefragService.class), Level.DEBUG); + GenericTestUtils.setLogLevel(LoggerFactory.getLogger(OmMetadataManagerImpl.class), Level.DEBUG); + GenericTestUtils.setLogLevel(LoggerFactory.getLogger(SnapshotCache.class), Level.DEBUG); + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 30, TimeUnit.SECONDS); + conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST); + conf.setBoolean(OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); + conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, BucketLayout.OBJECT_STORE.name()); + // TODO: Add SNAPSHOT_DEFRAGMENTATION layout feature version + conf.setInt(OMStorage.TESTING_INIT_LAYOUT_VERSION_KEY, + OMLayoutFeature.DELEGATION_TOKEN_SYMMETRIC_SIGN.layoutVersion()); + + conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, -1); + conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, 3000000); + conf.setInt(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, SNAPSHOT_DEFRAG_LIMIT_PER_TASK_VALUE); + + conf.setQuietMode(false); + + // Create MiniOzoneCluster + cluster = MiniOzoneCluster.newBuilder(conf).build(); + cluster.waitForClusterToBeReady(); + + client = cluster.newClient(); + store = client.getObjectStore(); + ozoneManager = cluster.getOzoneManager(); + + // Create SnapshotDefragService for manual triggering + defragService = new SnapshotDefragService( + 10000, // interval + TimeUnit.MILLISECONDS, + 30000, // service timeout + ozoneManager, + conf + ); + } + + @AfterAll + public void cleanup() throws Exception { + if (defragService != null) { + defragService.shutdown(); + } + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Create keys in a bucket. + */ + private void createKeys(String volumeName, String bucketName, int keyCount) throws Exception { + OzoneVolume volume = store.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + + for (int i = 0; i < keyCount; i++) { + String keyName = "key-" + i; + String data = RandomStringUtils.randomAlphabetic(100); + + try (OzoneOutputStream outputStream = bucket.createKey(keyName, data.length(), + StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE), + Collections.emptyMap(), Collections.emptyMap())) { + outputStream.write(data.getBytes(StandardCharsets.UTF_8)); + } + } + LOG.info("Created {} keys in bucket {}/{}", keyCount, volumeName, bucketName); + } + + /** + * Create a snapshot and wait for it to be available. + */ + private void createSnapshot(String volumeName, String bucketName, String snapshotName) + throws Exception { + // Get existing checkpoint directories before creating snapshot + Set existingCheckpoints = getExistingCheckpointDirectories(); + + store.createSnapshot(volumeName, bucketName, snapshotName); + + // Wait for snapshot to be created + GenericTestUtils.waitFor(() -> { + try { + SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName)); + return snapshotInfo != null; + } catch (IOException e) { + return false; + } + }, 1000, 30000); + + // Wait for checkpoint DB to be created + waitForCheckpointDB(snapshotName, existingCheckpoints); + + LOG.info("Created snapshot: {}", snapshotName); + } + + /** + * Get existing checkpoint directories before snapshot creation. + */ + private Set getExistingCheckpointDirectories() { + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState"; + File checkpointParentDir = new File(checkpointStateDir); + + Set existingDirs = new java.util.HashSet<>(); + if (checkpointParentDir.exists()) { + File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory); + if (checkpointDirs != null) { + for (File dir : checkpointDirs) { + existingDirs.add(dir.getName()); + } + } + } + + LOG.debug("Existing checkpoint directories: {}", existingDirs); + return existingDirs; + } + + /** + * Wait for checkpoint DB to be created under checkpointState directory. + */ + private void waitForCheckpointDB(String snapshotName, Set existingCheckpoints) throws Exception { + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState"; + File checkpointParentDir = new File(checkpointStateDir); + + LOG.info("Waiting for new checkpoint DB to be created under: {}", checkpointStateDir); + + GenericTestUtils.waitFor(() -> { + if (!checkpointParentDir.exists()) { + LOG.debug("CheckpointState directory does not exist yet: {}", checkpointStateDir); + return false; + } + + // List all directories in checkpointState + File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory); + if (checkpointDirs == null || checkpointDirs.length == 0) { + LOG.debug("No checkpoint directories found in: {}", checkpointStateDir); + return false; + } + + // Look for new checkpoint directories that weren't there before + for (File checkpointDir : checkpointDirs) { + String dirName = checkpointDir.getName(); + + // Skip if this directory existed before snapshot creation + if (existingCheckpoints.contains(dirName)) { + continue; + } + + // Check if the new directory contains database files + File[] dbFiles = checkpointDir.listFiles(); + if (dbFiles != null && dbFiles.length > 0) { + for (File dbFile : dbFiles) { + if (dbFile.isFile() && (dbFile.getName().endsWith(".sst") || + dbFile.getName().equals("CURRENT") || + dbFile.getName().startsWith("MANIFEST"))) { + LOG.info("New checkpoint DB found for snapshot {} in directory: {}", + snapshotName, checkpointDir.getAbsolutePath()); + return true; + } + } + } + + LOG.debug("New checkpoint directory found but no DB files yet: {}", checkpointDir.getAbsolutePath()); + } + + LOG.debug("Waiting for new checkpoint DB files to appear in checkpointState directories"); + return false; + }, 1000, 60000); // Wait up to 60 seconds for checkpoint DB creation + + LOG.info("Checkpoint DB created successfully for snapshot: {}", snapshotName); + } + + /** + * Lists the contents of the db.snapshots directory. + */ + private void printSnapshotDirectoryListing(String description) { + LOG.info("=== {} ===", description); + String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs"); + if (metadataDir == null) { + metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs"); + } + + String snapshotDir = metadataDir + "/db.snapshots"; + File snapshotsDir = new File(snapshotDir); + + if (!snapshotsDir.exists()) { + LOG.info("Snapshots directory does not exist: {}", snapshotDir); + return; + } + + try (Stream paths = Files.walk(Paths.get(snapshotDir))) { + paths.sorted() + .forEach(path -> { + File file = path.toFile(); + String relativePath = Paths.get(snapshotDir).relativize(path).toString(); + if (file.isDirectory()) { + LOG.info("Directory: {}/", relativePath.isEmpty() ? "." : relativePath); + } else { + LOG.info("File: {} (size: {} bytes)", relativePath, file.length()); + } + }); + } catch (IOException e) { + LOG.error("Error listing snapshot directory: {}", snapshotDir, e); + } + } + + /** + * Trigger the SnapshotDefragService by starting it and waiting for it to process snapshots. + */ + private void triggerSnapshotDefragService() throws Exception { + LOG.info("Triggering SnapshotDefragService ..."); + + // Mark all snapshots as needing defragmentation first + OMMetadataManager metadataManager = ozoneManager.getMetadataManager(); + try (TableIterator> iterator = + metadataManager.getSnapshotInfoTable().iterator()) { + iterator.seekToFirst(); + while (iterator.hasNext()) { + Table.KeyValue entry = iterator.next(); + SnapshotInfo snapshotInfo = entry.getValue(); + LOG.info("snapshot {}, needsDefragmentation = {}", + snapshotInfo.getName(), defragService.needsDefragmentation(snapshotInfo)); + } + } + + long initialDefragCount = defragService.getSnapshotsDefraggedCount().get(); + LOG.info("Initial defragmented count: {}", initialDefragCount); + + // Start the service + defragService.start(); + + // Wait for the service to process snapshots + try { + await(30000, 1000, () -> { + long currentCount = defragService.getSnapshotsDefraggedCount().get(); + LOG.info("Current defragmented count: {}", currentCount); + return currentCount > initialDefragCount && currentCount >= SNAPSHOT_DEFRAG_LIMIT_PER_TASK_VALUE; + }); + } catch (TimeoutException e) { + LOG.warn("Timeout waiting for defragmentation to complete, continuing with test"); + } + + LOG.info("SnapshotDefragService execution completed. Snapshots defragmented: {}", + defragService.getSnapshotsDefraggedCount().get()); + } + + @Test + public void testSnapshotDefragmentation() throws Exception { + String volumeName = "test-volume"; + String bucketName = "test-bucket"; + + // Create volume and bucket + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + // TODO: Test FSO bucket as well, default is LEGACY / OBJECT_STORE + volume.createBucket(bucketName); + + LOG.info("Starting snapshot defragmentation test..."); + + // Print initial state + printSnapshotDirectoryListing("Initial state - no snapshots"); + + // Step 1: Create 2 keys, then create snap-1 + createKeys(volumeName, bucketName, 2); + createSnapshot(volumeName, bucketName, "snap-1"); + printSnapshotDirectoryListing("After creating snap-1 (2 keys)"); + + // Step 2: Create 2 more keys, then create snap-2 + createKeys(volumeName, bucketName, 2); // TODO: This actually overwrites the previous keys + createSnapshot(volumeName, bucketName, "snap-2"); + printSnapshotDirectoryListing("After creating snap-2 (4 keys total)"); + + // Step 3: Create 2 more keys, then create snap-3 + createKeys(volumeName, bucketName, 2); // TODO: This actually overwrites the previous keys + createSnapshot(volumeName, bucketName, "snap-3"); + printSnapshotDirectoryListing("After creating snap-3 (6 keys total)"); + + // Step 4: Trigger SnapshotDefragService + triggerSnapshotDefragService(); + printSnapshotDirectoryListing("After SnapshotDefragService execution"); + + // Verify that the snapshots still exist + SnapshotInfo snap1 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-1")); + SnapshotInfo snap2 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-2")); + SnapshotInfo snap3 = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-3")); + + assertNotNull(snap1, "Snapshot snap-1 should exist"); + assertNotNull(snap2, "Snapshot snap-2 should exist"); + assertNotNull(snap3, "Snapshot snap-3 should exist"); + + LOG.info("Test completed successfully"); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index c7b071a6e8d9..a247ee986e7f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_MAX_OPEN_FILES; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_MAX_OPEN_FILES_DEFAULT; @@ -262,15 +263,20 @@ protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) } // metadata constructor for snapshots - OmMetadataManagerImpl(OzoneConfiguration conf, String snapshotDirName, int maxOpenFiles) throws IOException { + OmMetadataManagerImpl(OzoneConfiguration conf, String snapshotDirName, int maxOpenFiles, + int snapshotLocalDataVersion) throws IOException { try { lock = new OmReadOnlyLock(); hierarchicalLockManager = new ReadOnlyHierarchicalResourceLockManager(); omEpoch = 0; - String snapshotDir = OMStorage.getOmDbDir(conf) + - OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR; + String snapshotDir = OMStorage.getOmDbDir(conf) + OM_KEY_PREFIX + + (snapshotLocalDataVersion <= 0 ? OM_SNAPSHOT_CHECKPOINT_DIR : OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR); File metaDir = new File(snapshotDir); String dbName = OM_DB_NAME + snapshotDirName; + if (snapshotLocalDataVersion > 0) { + dbName += OzoneConsts.SNAPSHOT_DEFRAG_VERSION_SUFFIX_PREFIX + snapshotLocalDataVersion; + } + LOG.debug("snapshotLocalDataVersion = {}. Final dbName = {}", snapshotLocalDataVersion, dbName); checkSnapshotDirExist(Paths.get(metaDir.toPath().toString(), dbName).toFile()); final boolean enableRocksDBMetrics = conf.getBoolean( OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index 7b9beb80cf6f..7599eb6180ce 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -102,6 +102,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; import org.apache.hadoop.ozone.om.snapshot.SnapshotCache; import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; @@ -415,8 +416,9 @@ public OmSnapshot load(@Nonnull UUID snapshotId) throws IOException { "' with txnId : '" + TransactionInfo.fromByteString(snapshotInfo.getCreateTransactionInfo()) + "' has not been flushed yet. Please wait a few more seconds before retrying", TIMEOUT); } + int snapshotLocalDataVersion = OmSnapshotUtils.getLocalDataVersion(ozoneManager, snapshotInfo); snapshotMetadataManager = new OmMetadataManagerImpl(conf, - snapshotInfo.getCheckpointDirName(), maxOpenSstFilesInSnapshotDb); + snapshotInfo.getCheckpointDirName(), maxOpenSstFilesInSnapshotDb, snapshotLocalDataVersion); } catch (IOException e) { LOG.error("Failed to retrieve snapshot: {}", snapshotTableKey, e); throw e; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java index 212953cd874c..ba2a9fae3833 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java @@ -17,34 +17,70 @@ package org.apache.hadoop.ozone.om; +import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_GC_LOCK; +import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap; import com.google.common.annotations.VisibleForTesting; +import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager; +import org.apache.ozone.rocksdb.util.SstFileSetReader; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,28 +99,24 @@ public class SnapshotDefragService extends BackgroundService implements BootstrapStateHandler { - private static final Logger LOG = - LoggerFactory.getLogger(SnapshotDefragService.class); + private static final Logger LOG = LoggerFactory.getLogger(SnapshotDefragService.class); // Use only a single thread for snapshot defragmentation to avoid conflicts private static final int DEFRAG_CORE_POOL_SIZE = 1; + private static final String CHECKPOINT_STATE_DEFRAGGED_DIR = OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; private final OzoneManager ozoneManager; - private final AtomicLong runCount = new AtomicLong(0); - // Number of snapshots to be processed in a single iteration private final long snapshotLimitPerTask; - private final AtomicLong snapshotsDefraggedCount; + private final AtomicLong runCount = new AtomicLong(0); private final AtomicBoolean running; - private final MultiSnapshotLocks snapshotIdLocks; - private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager, OzoneConfiguration configuration) { - super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, + super(SnapshotDefragService.class.getSimpleName(), interval, unit, DEFRAG_CORE_POOL_SIZE, serviceTimeout, ozoneManager.getThreadNamePrefix()); this.ozoneManager = ozoneManager; this.snapshotLimitPerTask = configuration @@ -127,11 +159,9 @@ private boolean isRocksToolsNativeLibAvailable() { /** * Checks if a snapshot needs defragmentation by examining its YAML metadata. */ - private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { - String snapshotPath = OmSnapshotManager.getSnapshotPath( - ozoneManager.getConfiguration(), snapshotInfo); - - try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider readableOmSnapshotLocalDataProvider = + @VisibleForTesting + public boolean needsDefragmentation(SnapshotInfo snapshotInfo) { + try (ReadableOmSnapshotLocalDataProvider readableOmSnapshotLocalDataProvider = ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo)) { // Read snapshot local metadata from YAML // Check if snapshot needs compaction (defragmentation) @@ -146,24 +176,716 @@ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { } } + /** + * Returns a lexicographically higher string by appending a byte with maximum value. + * For example, "/" -> "/\xFF" + */ + private String getLexicographicallyHigherString(String str) { + return str + "\uFFFF"; + } + + /** + * Deletes unwanted key ranges from a column family. + * Deletes ranges: ["", prefix) and [lexHigher(prefix), lexHigher("/")] + */ + private void deleteUnwantedRanges(RocksDatabase db, RocksDatabase.ColumnFamily cf, + String prefix, String cfName) throws RocksDatabaseException { + if (cf == null) { + LOG.warn("Column family {} not found, skipping range deletion", cfName); + return; + } + + try (ManagedWriteBatch writeBatch = new ManagedWriteBatch()) { + // Delete range ["", prefix) + byte[] emptyKey = "".getBytes(StandardCharsets.UTF_8); + byte[] prefixBytes = prefix.getBytes(StandardCharsets.UTF_8); + cf.batchDeleteRange(writeBatch, emptyKey, prefixBytes); + + // Delete range [lexicographicalHigherString(prefix), lexicographicalHigherString("/")] + String highPrefixStr = getLexicographicallyHigherString(prefix); + byte[] highPrefix = highPrefixStr.getBytes(StandardCharsets.UTF_8); + byte[] highSlash = getLexicographicallyHigherString("/").getBytes(StandardCharsets.UTF_8); + cf.batchDeleteRange(writeBatch, highPrefix, highSlash); + + db.batchWrite(writeBatch); + LOG.debug("Deleted unwanted ranges from {}", cfName); + } + } + + /** + * Processes the checkpoint DB: deletes unwanted ranges and compacts. + */ + private void processCheckpointDb( + String checkpointDirName, RocksDatabase originalDb, SnapshotInfo snapshotInfo, + int nextVersion) throws IOException { + // Step 2: Create checkpoint MetadataManager after taking a checkpoint + LOG.debug("Opening checkpoint DB for defragmentation. checkpointDirName = {}", checkpointDirName); + + final String volumeName = snapshotInfo.getVolumeName(); + final String bucketName = snapshotInfo.getBucketName(); + OzoneConfiguration conf = ozoneManager.getConfiguration(); + final int maxOpenSstFilesInSnapshotDb = conf.getInt( + OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT); + + final String snapshotDirName = checkpointDirName.substring(OM_DB_NAME.length()); + try (OMMetadataManager defragDbMetadataManager = new OmMetadataManagerImpl( + conf, snapshotDirName, maxOpenSstFilesInSnapshotDb, nextVersion)) { + LOG.debug("Opened checkpoint DB for defragmentation"); + try (RDBStore rdbStore = (RDBStore) defragDbMetadataManager.getStore()) { + try (RocksDatabase checkpointDb = rdbStore.getDb()) { + + // Step 3-5: DeleteRange from tables + final String obsPrefix = defragDbMetadataManager.getOzoneKey(volumeName, bucketName, OM_KEY_PREFIX); + LOG.debug("Deleting unwanted ranges from KeyTable. obsPrefix = {}", obsPrefix); + deleteUnwantedRanges(checkpointDb, checkpointDb.getColumnFamily(KEY_TABLE), + obsPrefix, KEY_TABLE); + + final String fsoPrefix = defragDbMetadataManager.getOzoneKeyFSO(volumeName, bucketName, OM_KEY_PREFIX); + LOG.debug("Deleting unwanted ranges from DirectoryTable. fsoPrefix = {}", fsoPrefix); + deleteUnwantedRanges(checkpointDb, checkpointDb.getColumnFamily(DIRECTORY_TABLE), + fsoPrefix, DIRECTORY_TABLE); + + LOG.debug("Deleting unwanted ranges from FileTable. fsoPrefix = {}", fsoPrefix); + deleteUnwantedRanges(checkpointDb, checkpointDb.getColumnFamily(FILE_TABLE), + fsoPrefix, FILE_TABLE); + + // Do we need to drop other tables here as well? + + // Step 6: Force compact all tables in the checkpoint + LOG.debug("Force compacting all tables in checkpoint DB"); + try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) { + compactOptions.setChangeLevel(true); + compactOptions.setBottommostLevelCompaction( + ManagedCompactRangeOptions.BottommostLevelCompaction.kForce); + checkpointDb.compactDB(compactOptions); + } + LOG.debug("Completed force compaction of all tables"); + + // Verify defrag DB integrity + verifyDbIntegrity(originalDb, checkpointDb, snapshotInfo); + + // Update snapshot metadata to mark defragmentation as complete + updateSnapshotMetadataAfterDefrag(rdbStore, snapshotInfo); + } + } + } catch (IOException e) { + throw new IOException("Failed to process checkpoint DB", e); + } catch (Exception e) { + // Handle Exception from AutoCloseable.close() + throw new IOException("Failed to process checkpoint DB", e); + } + } + + /** + * Deletes the old defragmented DB version if it exists. + * This is called after successfully creating a new defragmented DB version + * to clean up the previous version and save disk space. + * When oldVersion is 0, deletes the original DB from checkpointState/. + * When oldVersion > 0, deletes the previous defragmented DB from checkpointStateDefragged/. + * + * @param snapshotInfo the snapshot information + * @param newVersion the newly created version number + * @param parentDir the parent directory containing the defragmented DBs + */ + private void deleteOldDb(SnapshotInfo snapshotInfo, int newVersion, String parentDir) { + + // Invalidate SnapshotCache entry, otherwise the cache could hold on to the old DB handle and cause issues + ozoneManager.getOmSnapshotManager().invalidateCacheEntry(snapshotInfo.getSnapshotId()); + + int oldVersion = newVersion - 1; + if (oldVersion < 0) { + LOG.error("Invalid oldVersion: {}", oldVersion); + return; + } + + String oldCheckpointDirName; + String oldDbPath; + + if (oldVersion == 0) { + // Delete original DB from checkpointState/ directory + oldCheckpointDirName = OM_DB_NAME + snapshotInfo.getCheckpointDirName(); + oldDbPath = Paths.get(parentDir, OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR, + oldCheckpointDirName).toString(); + } else { + // Delete previous defragmented DB version from checkpointStateDefragged/ directory + String oldVersionSuffix = OzoneConsts.SNAPSHOT_DEFRAG_VERSION_SUFFIX_PREFIX + oldVersion; + oldCheckpointDirName = OM_DB_NAME + snapshotInfo.getCheckpointDirName() + oldVersionSuffix; + oldDbPath = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, + oldCheckpointDirName).toString(); + } + + try { + if (Files.exists(Paths.get(oldDbPath))) { + FileUtils.deleteDirectory(new File(oldDbPath)); + LOG.debug("Deleted old {} DB version {} at: {}", + oldVersion == 0 ? "original" : "defragged", oldVersion, oldDbPath); + } + } catch (IOException e) { + LOG.warn("Failed to delete old DB at: {}", oldDbPath, e); + } + } + /** * Performs full defragmentation for the first snapshot in the chain. - * This is a simplified implementation that demonstrates the concept. + * Steps: + * 1. Take checkpoint of current DB + * 2. Create checkpoint MetadataManager after taking a checkpoint + * 3. DeleteRange KeyTable from ["", obsPrefix) + + * [lexicographicalHigherString(keyTablePrefix), lexicographicalHigherString("/")] + * 4. DeleteRange DirectoryTable from ["", fsoPrefix) + + * [lexicographicalHigherString(fsoPrefix), lexicographicalHigherString("/")] + * 5. DeleteRange FileTable from ["", fsoPrefix) + + * [lexicographicalHigherString(fsoPrefix), lexicographicalHigherString("/")] + * 6. Force compact all tables in the checkpoint. */ private void performFullDefragmentation(SnapshotInfo snapshotInfo, OmSnapshot omSnapshot) throws IOException { - // TODO: Implement full defragmentation + int nextVersion = OmSnapshotUtils.getLocalDataVersion(ozoneManager, snapshotInfo) + 1; + String versionSuffix = OzoneConsts.SNAPSHOT_DEFRAG_VERSION_SUFFIX_PREFIX + nextVersion; + + String checkpointDirNameWithoutSuffix = OM_DB_NAME + snapshotInfo.getCheckpointDirName(); + // parent dir of db.snapshots + String parentDir = OMStorage.getOmDbDir(ozoneManager.getConfiguration()).toString(); + String defraggedDbPath = + Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, checkpointDirNameWithoutSuffix).toString(); + + String snapshotPath = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo); + LOG.info("Starting full defrag for snapshot: {} at path: {}. Target DB path: {}", + snapshotInfo.getName(), snapshotPath, defraggedDbPath); + + // Get snapshot checkpoint DB + RDBStore originalStore = (RDBStore) omSnapshot.getMetadataManager().getStore(); + RocksDatabase originalDb = originalStore.getDb(); + if (originalDb == null) { + throw new IOException("Failed to get RocksDatabase from original snapshot store"); + } + assert !originalDb.isClosed(); + + try { + LOG.debug("Starting defrag for snapshot: {}", snapshotInfo.getName()); + + // Step 1: Take checkpoint of current DB + LOG.debug("Creating checkpoint from original DB to defragmented path"); + try (RocksDatabase.RocksCheckpoint checkpoint = originalDb.createCheckpoint()) { + final String dbPathWithVersionSuffix = defraggedDbPath + versionSuffix; + checkpoint.createCheckpoint(Paths.get(dbPathWithVersionSuffix)); + LOG.debug("Created checkpoint at: {}", dbPathWithVersionSuffix); + } + + // Steps 2-6: Process checkpoint DB (delete ranges and compact) + processCheckpointDb(checkpointDirNameWithoutSuffix, originalDb, snapshotInfo, nextVersion); + + LOG.debug("Successfully completed full defragmentation for snapshot: {}", snapshotInfo.getName()); + + // Delete old DB, if any + deleteOldDb(snapshotInfo, nextVersion, parentDir); + + } catch (RocksDatabaseException e) { + LOG.error("RocksDB error during defragmentation of snapshot: {}", snapshotInfo.getName(), e); + throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e); + } catch (Exception e) { + LOG.error("Unexpected error during defragmentation of snapshot: {}", snapshotInfo.getName(), e); + throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e); + } } /** * Performs incremental defragmentation using diff from previous defragmented snapshot. */ - private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot, - SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot) + private void performIncrementalDefragmentation(SnapshotInfo currentSnapshotInfo, + SnapshotInfo previousSnapshotInfo, OmSnapshot currentOmSnapshot) throws IOException { - // TODO: Implement incremental defragmentation + // previous snapshot's current version + int previousCurrVersion = OmSnapshotUtils.getLocalDataVersion(ozoneManager, previousSnapshotInfo); + String previousVersionSuffix = OzoneConsts.SNAPSHOT_DEFRAG_VERSION_SUFFIX_PREFIX + previousCurrVersion; + String previousCheckpointDirNameWithoutSuffix = OM_DB_NAME + previousSnapshotInfo.getCheckpointDirName(); + String previousParentDir = OMStorage.getOmDbDir(ozoneManager.getConfiguration()).toString(); + String previousDefraggedDbPath = + Paths.get(previousParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, previousCheckpointDirNameWithoutSuffix).toString(); + + // current snapshot's next version + int currentNextVersion = OmSnapshotUtils.getLocalDataVersion(ozoneManager, currentSnapshotInfo) + 1; + String currentVersionSuffix = OzoneConsts.SNAPSHOT_DEFRAG_VERSION_SUFFIX_PREFIX + currentNextVersion; + String currentCheckpointDirNameWithoutSuffix = OM_DB_NAME + currentSnapshotInfo.getCheckpointDirName(); + String currentParentDir = OMStorage.getOmDbDir(ozoneManager.getConfiguration()).toString(); + String currentDefraggedDbPath = + Paths.get(currentParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, currentCheckpointDirNameWithoutSuffix).toString(); + + LOG.info("Starting incremental defrag for snapshot: {} using previous defragged snapshot: {}. " + + "Previous defragged DB path: {}. Target DB path: {}", + currentSnapshotInfo.getName(), previousSnapshotInfo.getName(), previousDefraggedDbPath, currentDefraggedDbPath); + + // Note: Don't create target directory - RocksDB createCheckpoint() will create it + // and will fail with "Directory exists" if we create it first + + try { + // Check if previous defragmented DB exists + if (!Files.exists(Paths.get(previousDefraggedDbPath + previousVersionSuffix))) { + // Fail fast: throw exception instead of falling back to full defragmentation + String errorMsg = String.format("Previous defragmented DB not found at '%s' for snapshot '%s'. " + + "Incremental defragmentation cannot proceed.", + previousDefraggedDbPath, previousSnapshotInfo.getName()); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + + // Create a checkpoint from the previous defragmented DB directly at target location + LOG.debug("Creating checkpoint from previous defragmented DB directly to target location"); + + final OmSnapshotManager snapshotManager = ozoneManager.getOmSnapshotManager(); + try (UncheckedAutoCloseableSupplier prevSnapshotSupplier = + snapshotManager.getSnapshot(previousSnapshotInfo.getSnapshotId())) { + LOG.debug("Opened previous (defragged) snapshot: {}", previousSnapshotInfo.getName()); + OmSnapshot prevSnapshot = prevSnapshotSupplier.get(); + // Sanity check: Ensure previous snapshot is marked as defragmented + if (needsDefragmentation(previousSnapshotInfo)) { + LOG.error("Previous snapshot {} is not marked as defragmented. Something is wrong.", + previousSnapshotInfo.getName()); + throw new IOException("Previous snapshot " + previousSnapshotInfo.getName() + + " is not marked as defragmented. Cannot proceed with incremental defragmentation."); + } + RDBStore prevStore = (RDBStore) prevSnapshot.getMetadataManager().getStore(); + RocksDatabase prevDb = prevStore.getDb(); + if (prevDb == null) { + throw new IOException("Failed to get RocksDatabase from previous defragmented snapshot store"); + } + assert !prevDb.isClosed(); + + try (RocksDatabase.RocksCheckpoint checkpoint = prevDb.createCheckpoint()) { + final String dbPathWithVersionSuffix = currentDefraggedDbPath + currentVersionSuffix; + checkpoint.createCheckpoint(Paths.get(dbPathWithVersionSuffix)); + LOG.debug("Created checkpoint at: {}", dbPathWithVersionSuffix); + } + } + + OzoneConfiguration conf = ozoneManager.getConfiguration(); + final int maxOpenSstFilesInSnapshotDb = conf.getInt( + OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT); + + final String currSnapshotDirName = currentCheckpointDirNameWithoutSuffix.substring(OM_DB_NAME.length()); + try (OMMetadataManager currDefragDbMetadataManager = new OmMetadataManagerImpl( + conf, currSnapshotDirName, maxOpenSstFilesInSnapshotDb, currentNextVersion)) { + LOG.debug("Opened OMMetadataManager for checkpoint DB for incremental update"); + try (RDBStore currentDefraggedStore = (RDBStore) currDefragDbMetadataManager.getStore()) { + try (RocksDatabase currentDefraggedDb = currentDefraggedStore.getDb()) { + + // Apply incremental changes from current snapshot + RDBStore currentSnapshotStore = (RDBStore) currentOmSnapshot.getMetadataManager().getStore(); + RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb(); + + long incrementalKeysCopied = applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore, + currentSnapshotInfo, previousSnapshotInfo); + + LOG.debug("Applied {} incremental changes for snapshot: {}", + incrementalKeysCopied, currentSnapshotInfo.getName()); + + // Verify defrag DB integrity + if (!verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb, currentSnapshotInfo)) { + throw new IOException("Defragmented DB integrity verification failed for snapshot: " + + currentSnapshotInfo.getName()); + } + + // Create a new version in YAML metadata, which also indicates that the defragmentation is complete + updateSnapshotMetadataAfterDefrag(currentDefraggedStore, currentSnapshotInfo); + + LOG.debug("Successfully completed incremental defragmentation for snapshot: {} with {} incremental changes", + currentSnapshotInfo.getName(), incrementalKeysCopied); + + // Delete old DB, if any + deleteOldDb(currentSnapshotInfo, currentNextVersion, currentParentDir); + } + } + } + + } catch (RocksDatabaseException e) { + LOG.error("RocksDB error during incremental defragmentation of snapshot: {}", + currentSnapshotInfo.getName(), e); + throw new IOException("RocksDB error during incremental defragmentation of snapshot: " + + currentSnapshotInfo.getName(), e); + } catch (Exception e) { + LOG.error("Unexpected error during incremental defragmentation of snapshot: {}", + currentSnapshotInfo.getName(), e); + throw new IOException("Unexpected error during incremental defragmentation of snapshot: " + + currentSnapshotInfo.getName(), e); + } + } + + /** + * Applies incremental changes by using SnapshotDiffManager#getDeltaFiles() to get SST files, + * then iterating through keys using SstFileSetReader with byte-level comparison. + * Uses RDBSstFileWriter to directly write changes to SST files and then ingests them. + */ + @SuppressWarnings("checkstyle:MethodLength") + private long applyIncrementalChanges(RocksDatabase currentSnapshotDb, DBStore targetStore, + SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws IOException { + + LOG.debug("Applying incremental changes for snapshot: '{}' since: '{}' using delta files approach", + currentSnapshot.getName(), previousSnapshot.getName()); + + long totalChanges = 0; + + // Create temporary directory for SST files + String parentDir = OMStorage.getOmDbDir(ozoneManager.getConfiguration()).toString(); + String omSnapshotDir = Paths.get(parentDir, OM_SNAPSHOT_DIR).toString(); + String tempSstDir = Paths.get(omSnapshotDir, OzoneConsts.TEMP_DIFF_SST_FILES_DIR).toString(); + String diffDir = Paths.get(omSnapshotDir, CHECKPOINT_STATE_DEFRAGGED_DIR, + "deltaFilesDiff-" + UUID.randomUUID()).toString(); + + try { + Files.createDirectories(Paths.get(tempSstDir)); + LOG.debug("Created temporary SST directory: {}", tempSstDir); + Files.createDirectories(Paths.get(diffDir)); + LOG.debug("Created diff directory: {}", diffDir); + + // Get OmSnapshotManager + OmSnapshotManager snapshotManager = ozoneManager.getOmSnapshotManager(); + + // Get OmSnapshot instances for previous and current snapshots + try (UncheckedAutoCloseableSupplier previousSnapshotSupplier = + snapshotManager.getSnapshot(previousSnapshot.getSnapshotId()); + UncheckedAutoCloseableSupplier currentSnapshotSupplier = + snapshotManager.getSnapshot(currentSnapshot.getSnapshotId())) { + + OmSnapshot previousOmSnapshot = previousSnapshotSupplier.get(); + OmSnapshot currentOmSnapshot = currentSnapshotSupplier.get(); + + // Get the SnapshotDiffManager + SnapshotDiffManager diffManager = snapshotManager.getSnapshotDiffManager(); + + // Get column family to key prefix map for filtering SST files + Map tablePrefixes = getColumnFamilyToKeyPrefixMap( + ozoneManager.getMetadataManager(), + currentSnapshot.getVolumeName(), + currentSnapshot.getBucketName()); + + // Get table references for target database + RDBStore targetRdbStore = (RDBStore) targetStore; + RocksDatabase targetDb = targetRdbStore.getDb(); + if (targetDb == null) { + throw new IOException("Failed to get RocksDatabase from target store"); + } + + RDBStore previousDefraggedStore = (RDBStore) targetStore; // The previous defragged DB is our target base + RocksDatabase previousDefraggedDb = previousDefraggedStore.getDb(); + if (previousDefraggedDb == null) { + throw new IOException("Failed to get RocksDatabase from previous defragmented store"); + } + + // Process each column family + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + RocksDatabase.ColumnFamily currentCf = currentSnapshotDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily previousCf = previousDefraggedDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily targetCf = targetDb.getColumnFamily(cfName); + + if (currentCf == null || previousCf == null || targetCf == null) { + LOG.warn("Column family {} not found in one of the databases, skipping incremental changes", cfName); + continue; + } + + Table targetTable = targetRdbStore.getTable(cfName); + if (targetTable == null) { + LOG.warn("Table {} not found in target store, skipping", cfName); + continue; + } + + // Get delta files for this column family + List tablesToLookUp = Collections.singletonList(cfName); + Set deltaFiles; + try { + deltaFiles = diffManager.getDeltaFiles( + previousOmSnapshot, currentOmSnapshot, + tablesToLookUp, + previousSnapshot, currentSnapshot, + false, // useFullDiff = false + tablePrefixes, + diffDir, + "defrag-" + currentSnapshot.getSnapshotId() // jobKey + ); + LOG.debug("Got {} delta SST files for column family: {}", deltaFiles.size(), cfName); + } catch (Exception e) { + LOG.error("Failed to get delta files for column family {}: {}", cfName, e.getMessage(), e); + continue; + } + + if (deltaFiles.isEmpty()) { + LOG.debug("No delta files for column family {}, skipping", cfName); + continue; + } + + AtomicLong cfChanges = new AtomicLong(0); + String sstFileName = cfName + "_" + currentSnapshot.getSnapshotId() + ".sst"; + File sstFile = new File(tempSstDir, sstFileName); + + LOG.debug("Creating SST file for column family {} changes: {}", cfName, sstFile.getAbsolutePath()); + + // Use SstFileSetReader to read keys from delta files + SstFileSetReader sstFileReader = new SstFileSetReader(deltaFiles); + + try (RDBSstFileWriter sstWriter = new RDBSstFileWriter(sstFile)) { + + // Get key stream with tombstones from delta files + try (Stream keysToCheck = sstFileReader.getKeyStreamWithTombstone(null, null)) { + + keysToCheck.forEach(keyStr -> { + try { + byte[] key = keyStr.getBytes(StandardCharsets.UTF_8); + + // Get values from previous defragmented snapshot and current snapshot + byte[] previousValue = previousDefraggedDb.get(previousCf, key); + byte[] currentValue = currentSnapshotDb.get(currentCf, key); + + // Byte-level comparison: only write if values are different + if (!Arrays.equals(previousValue, currentValue)) { + if (currentValue != null) { + // Key exists in current snapshot - write the new value + sstWriter.put(key, currentValue); + } else { + // Key doesn't exist in current snapshot (deleted) - write tombstone + sstWriter.delete(key); + } + + // Increment change counter + cfChanges.getAndIncrement(); + } + } catch (Exception e) { + LOG.error("Error processing key {} for column family {}: {}", + keyStr, cfName, e.getMessage(), e); + throw new RuntimeException(e); + } + }); + + } catch (RocksDBException e) { + LOG.error("RocksDB error reading keys from delta files for column family {}: {}", + cfName, e.getMessage(), e); + } + + LOG.debug("Finished writing {} changes for column family: {} to SST file", + cfChanges.get(), cfName); + + } catch (Exception e) { + LOG.error("Error processing column family {} for snapshot {}: {}", + cfName, currentSnapshot.getName(), e.getMessage(), e); + } + + // Ingest SST file into target database if there were changes + if (sstFile.exists() && sstFile.length() > 0) { + try { + targetTable.loadFromFile(sstFile); + LOG.debug("Successfully ingested SST file for column family {}: {} changes", + cfName, cfChanges); + } catch (Exception e) { + LOG.error("Failed to ingest SST file for column family {}: {}", cfName, e.getMessage(), e); + throw new IOException("Failed to ingest SST file for column family: " + cfName, e); + } + } else { + LOG.debug("No changes to ingest for column family {}", cfName); + } + + // Clean up SST file after ingestion + try { + if (sstFile.exists()) { + Files.delete(sstFile.toPath()); + LOG.debug("Cleaned up SST file: {}", sstFile.getAbsolutePath()); + } + } catch (IOException e) { + LOG.warn("Failed to clean up SST file: {}", sstFile.getAbsolutePath(), e); + } + + totalChanges += cfChanges.get(); + LOG.debug("Applied {} incremental changes for column family: {}", cfChanges, cfName); + } + } + + // Clean up temporary directories + try { + FileUtils.deleteDirectory(new File(tempSstDir)); + LOG.debug("Cleaned up temporary SST directory: {}", tempSstDir); + } catch (IOException e) { + LOG.warn("Failed to clean up temporary SST directory: {}", tempSstDir, e); + } + + try { + org.apache.commons.io.FileUtils.deleteDirectory(new File(diffDir)); + LOG.debug("Cleaned up diff directory: {}", diffDir); + } catch (IOException e) { + LOG.warn("Failed to clean up diff directory: {}", diffDir, e); + } + + } catch (IOException e) { + throw new IOException("applyIncrementalChanges failed", e); + } + + LOG.debug("Applied {} total incremental changes using delta files approach", totalChanges); + return totalChanges; + } + + /** + * Updates the snapshot metadata in the YAML file after defragmentation. + * Creates a new version in the metadata with the defragmented SST file list, + * marks the snapshot as no longer needing defragmentation, and sets the last defrag time. + */ + private void updateSnapshotMetadataAfterDefrag(RDBStore defraggedStore, SnapshotInfo snapshotInfo) + throws IOException { + LOG.debug("Updating snapshot metadata for defragmented snapshot: {}", snapshotInfo.getName()); + + final OmSnapshotLocalDataManager localDataManager = + ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager(); + try (WritableOmSnapshotLocalDataProvider writableProvider = + localDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) { + // Add a new version with the defragmented SST file list + writableProvider.addSnapshotVersion(defraggedStore); + // Get the snapshot local data to update flags + OmSnapshotLocalData snapshotLocalData = writableProvider.getSnapshotLocalData(); + snapshotLocalData.setNeedsDefrag(false); + snapshotLocalData.setSstFiltered(true); + snapshotLocalData.setLastDefragTime(System.currentTimeMillis()); + writableProvider.commit(); + + LOG.debug("Successfully updated snapshot metadata for snapshot: {} with new version: {}", + snapshotInfo.getName(), snapshotLocalData.getVersion()); + } catch (IOException e) { + LOG.error("Failed to update snapshot metadata for snapshot: {}", snapshotInfo.getName(), e); + throw e; + } + } + + /** + * Verifies DB integrity by comparing key counts and spot-checking keys/values + * between the original and defragmented databases. + */ + private boolean verifyDbIntegrity(RocksDatabase originalDb, RocksDatabase defraggedDb, + SnapshotInfo snapshotInfo) { + + LOG.info("Starting DB integrity verification for snapshot: {}", snapshotInfo.getName()); + + boolean verificationPassed = true; + long totalOriginalKeys = 0; + long totalDefraggedKeys = 0; + + for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) { + LOG.debug("Verifying column family: {} for snapshot: {}", + cfName, snapshotInfo.getName()); + + RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName); + RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName); + + if (originalCf == null) { + LOG.warn("Column family {} not found in original DB, skipping verification", cfName); + continue; + } + + if (defraggedCf == null) { + LOG.error("Column family {} not found in defragmented DB, verification failed", cfName); + verificationPassed = false; + continue; + } + + try { + // Count keys in original DB + long originalKeyCount = 0; + try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) { + originalIterator.get().seekToFirst(); + while (originalIterator.get().isValid()) { + originalKeyCount++; + originalIterator.get().next(); + } + } + + // Count keys in defragmented DB + long defraggedKeyCount = 0; + try (ManagedRocksIterator defraggedIterator = defraggedDb.newIterator(defraggedCf)) { + defraggedIterator.get().seekToFirst(); + while (defraggedIterator.get().isValid()) { + defraggedKeyCount++; + defraggedIterator.get().next(); + } + } + + totalOriginalKeys += originalKeyCount; + totalDefraggedKeys += defraggedKeyCount; + + // Verify key counts match + if (originalKeyCount != defraggedKeyCount) { + LOG.error("Key count mismatch for column family {}: original={}, defragmented={}", + cfName, originalKeyCount, defraggedKeyCount); + verificationPassed = false; + } else { + LOG.info("Key count verification passed for column family {}: {} keys", + cfName, originalKeyCount); + } + + // Full verification - check every single key-value pair + long fullCheckCount = 0; + long fullCheckErrors = 0; + + try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) { + originalIterator.get().seekToFirst(); + + while (originalIterator.get().isValid()) { + byte[] originalKey = originalIterator.get().key(); + byte[] originalValue = originalIterator.get().value(); + + // Get the same key from defragmented DB + byte[] defraggedValue = defraggedDb.get(defraggedCf, originalKey); + + if (defraggedValue == null) { + LOG.error("Key missing in defragmented DB for column family {}: key #{}", + cfName, fullCheckCount); + verificationPassed = false; + fullCheckErrors++; + } else if (!java.util.Arrays.equals(originalValue, defraggedValue)) { + LOG.error("Value mismatch for column family {}: key #{}", + cfName, fullCheckCount); + verificationPassed = false; + fullCheckErrors++; + } + + fullCheckCount++; + + // Log progress every 10,000 keys to avoid log spam + if (fullCheckCount % 10000 == 0) { + LOG.debug("Full verification progress for column family {}: checked {} keys, {} errors so far", + cfName, fullCheckCount, fullCheckErrors); + } + + if (fullCheckErrors > 10) { + LOG.warn("Too many errors found during full verification for column family {}, stopping further checks", + cfName); + break; // Stop if too many errors to avoid flooding logs + } + + originalIterator.get().next(); + } + } + + if (fullCheckErrors == 0) { + LOG.info("Full verification PASSED for column family {}: all {} keys verified successfully", + cfName, fullCheckCount); + } else { + LOG.error("Full verification FAILED for column family {}: {} errors found out of {} keys checked", + cfName, fullCheckErrors, fullCheckCount); + } + + } catch (IOException e) { + LOG.error("Error during verification of column family {} for snapshot {}: {}", + cfName, snapshotInfo.getName(), e.getMessage(), e); + verificationPassed = false; + } + } + + // Log final verification results + if (verificationPassed) { + LOG.info("DB integrity verification PASSED for snapshot: {} " + + "(total original keys: {}, total defragmented keys: {})", + snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys); + } else { + LOG.error("DB integrity verification FAILED for snapshot: {} " + + "(total original keys: {}, total defragmented keys: {})", + snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys); + } + return verificationPassed; } private final class SnapshotDefragTask implements BackgroundTask { @@ -234,7 +956,7 @@ public synchronized boolean triggerSnapshotDefragOnce() throws IOException { continue; } - LOG.info("Will defrag snapshot: {} (ID: {})", + LOG.debug("Will defrag snapshot: {} (ID: {})", snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); // Acquire MultiSnapshotLocks @@ -246,7 +968,7 @@ public synchronized boolean triggerSnapshotDefragOnce() throws IOException { } try { - LOG.info("Processing snapshot defragmentation for: {} (ID: {})", + LOG.debug("Processing snapshot defragmentation for: {} (ID: {})", snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); // Get snapshot through SnapshotCache for proper locking @@ -258,14 +980,14 @@ public synchronized boolean triggerSnapshotDefragOnce() throws IOException { UUID pathPreviousSnapshotId = snapshotToDefrag.getPathPreviousSnapshotId(); boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null; if (isFirstSnapshotInPath) { - LOG.info("Performing full defragmentation for first snapshot (in path): {}", + LOG.debug("Performing full defragmentation for first snapshot (in path): {}", snapshotToDefrag.getName()); performFullDefragmentation(snapshotToDefrag, omSnapshot); } else { final String psIdtableKey = snapshotChainManager.getTableKey(pathPreviousSnapshotId); SnapshotInfo previousDefraggedSnapshot = snapshotInfoTable.get(psIdtableKey); - LOG.info("Performing incremental defragmentation for snapshot: {} " + + LOG.debug("Performing incremental defragmentation for snapshot: {} " + "based on previous defragmented snapshot: {}", snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); @@ -278,27 +1000,25 @@ public synchronized boolean triggerSnapshotDefragOnce() throws IOException { break; } - performIncrementalDefragmentation(snapshotToDefrag, - previousDefraggedSnapshot, omSnapshot); + performIncrementalDefragmentation( + snapshotToDefrag, previousDefraggedSnapshot, omSnapshot); } - // TODO: Update snapshot metadata here? + // Note: deleteOldDb() already invalidates the SnapshotCache entry for this snapshot - // Close and evict the original snapshot DB from SnapshotCache - // TODO: Implement proper eviction from SnapshotCache - LOG.info("Defragmentation completed for snapshot: {}", - snapshotToDefrag.getName()); + LOG.info("Defragmentation completed for snapshot: {}", snapshotToDefrag.getName()); snapshotLimit--; snapshotsDefraggedCount.getAndIncrement(); } catch (OMException ome) { if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) { - LOG.info("Snapshot {} was deleted during defragmentation", + LOG.warn("Snapshot '{}' was deleted during defragmentation", snapshotToDefrag.getName()); } else { LOG.error("OMException during snapshot defragmentation for: {}", snapshotToDefrag.getName(), ome); + return false; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java index 497c7a064b8b..0a70ecf744f8 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java @@ -33,7 +33,11 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.ozone.om.OmSnapshotLocalData; import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -214,4 +218,27 @@ public static void linkFiles(File oldDir, File newDir) throws IOException { } } } + + /** + * Retrieves the local data version for the given snapshot. + *

+ * The version is read from the snapshot's local data YAML metadata file. + * This version is used to track the state of defragmentation operations + * on the snapshot. + * + * @param snapshotInfo SnapshotInfo object + * @return the version number from the snapshot's local data, or 0 if the + * metadata cannot be read + */ + public static int getLocalDataVersion(OzoneManager ozoneManager, SnapshotInfo snapshotInfo) { + try (ReadableOmSnapshotLocalDataProvider readableOmSnapshotLocalDataProvider = + ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo)) { + OmSnapshotLocalData snapshotLocalData = readableOmSnapshotLocalDataProvider.getSnapshotLocalData(); + return snapshotLocalData.getVersion(); + } catch (IOException e) { + LOG.warn("Failed to read YAML metadata for snapshot {}, assuming version 0", + snapshotInfo.getName(), e); + return 0; + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java index ce79c32fc4ee..9f1bbe02923b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java @@ -153,6 +153,7 @@ public void invalidate(UUID key) { try { v.get().getMetadataManager().getStore().flushDB(); v.get().close(); + LOG.debug("Invalidated SnapshotId: '{}' in snapshot cache.", k); } catch (IOException e) { throw new IllegalStateException("Failed to close snapshotId: " + key, e); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 21c2b5979a72..2f2ec263ec1c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -1166,18 +1166,16 @@ void addToObjectIdMap(Table fsTable, } } - @VisibleForTesting @SuppressWarnings("checkstyle:ParameterNumber") - Set getDeltaFiles(OmSnapshot fromSnapshot, - OmSnapshot toSnapshot, - List tablesToLookUp, - SnapshotInfo fsInfo, - SnapshotInfo tsInfo, - boolean useFullDiff, - Map tablePrefixes, - String diffDir, String jobKey) + public Set getDeltaFiles( + OmSnapshot fromSnapshot, OmSnapshot toSnapshot, + List tablesToLookUp, + SnapshotInfo fsInfo, SnapshotInfo tsInfo, + boolean useFullDiff, + Map tablePrefixes, + String diffDir, String jobKey) throws IOException { - // TODO: [SNAPSHOT] Refactor the parameter list + Optional> deltaFiles = Optional.empty(); // Check if compaction DAG is available, use that if so @@ -1587,6 +1585,11 @@ private synchronized void updateJobStatus(String jobKey, synchronized void recordActivity(String jobKey, SnapshotDiffResponse.SubStatus subStatus) { SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey); + if (snapshotDiffJob == null) { + // TODO: Record activity for defrag jobs as well somehow + LOG.warn("Snapshot diff job not found for jobKey = {}", jobKey); + return; + } snapshotDiffJob.setSubStatus(subStatus); snapDiffJobTable.put(jobKey, snapshotDiffJob); if (LOG.isDebugEnabled()) {