diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 0fb91f42d90a..f71ffe42197c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -145,6 +145,12 @@ TypedTable getTable( */ Map getTableNames(); + /** + * Drop the specific table. + * @param tableName - Name of the table to truncate. + */ + void dropTable(String tableName) throws RocksDatabaseException; + /** * Get data written to DB since a specific sequence number. */ @@ -162,4 +168,6 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) * @return true if the DB is closed. */ boolean isClosed(); + + String getSnapshotsParentDir(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index e3853a84211c..8e2e2b85426e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -189,6 +189,7 @@ public String getSnapshotMetadataDir() { return dbLocation.getParent() + OM_KEY_PREFIX + OM_SNAPSHOT_DIFF_DIR; } + @Override public String getSnapshotsParentDir() { return snapshotsParentDir; } @@ -332,6 +333,29 @@ public Map getTableNames() { return db.getColumnFamilyNames(); } + /** + /** + * Drops a table from the database by removing its associated column family. + *

+ * Warning: This operation should be used with extreme caution. If the table needs to be used again, + * it is recommended to reinitialize the entire DB store, as the column family will be permanently + * removed from the database. This method is suitable for truncating a RocksDB column family in a single operation. + * + * @param tableName the name of the table to be dropped + * @throws RocksDatabaseException if an error occurs while attempting to drop the table + */ + @Override + public void dropTable(String tableName) throws RocksDatabaseException { + ColumnFamily columnFamily = db.getColumnFamily(tableName); + if (columnFamily != null) { + try { + db.getManagedRocksDb().get().dropColumnFamily(columnFamily.getHandle()); + } catch (RocksDBException e) { + throw new RocksDatabaseException("Failed to drop " + tableName, e); + } + } + } + public Collection getColumnFamilies() { return db.getExtraColumnFamilies(); } diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index baac362da741..461324d00221 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.nio.file.Path; import java.time.Duration; import java.util.Iterator; import java.util.List; @@ -85,6 +86,13 @@ public interface OMMetadataManager extends DBStoreHAManager, AutoCloseable { @VisibleForTesting DBStore getStore(); + /** + * Retrieves the parent directory of all the snapshots in the system. + * + * @return a Path object representing the parent directory of the snapshot. + */ + Path getSnapshotParentDir(); + /** * Returns the OzoneManagerLock used on Metadata DB. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index b5137ed3d61a..a0dd69a97526 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -421,10 +421,14 @@ public void startSnapshotDefragService(OzoneConfiguration conf) { OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - snapshotDefragService = - new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS, - serviceTimeout, ozoneManager, conf); - snapshotDefragService.start(); + try { + snapshotDefragService = + new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS, + serviceTimeout, ozoneManager, conf); + snapshotDefragService.start(); + } catch (IOException e) { + LOG.error("Error starting Snapshot Defrag Service", e); + } } else { LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation will not run periodically."); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 6e79ca25ac8a..aeaf860c964d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -234,6 +234,11 @@ protected OmMetadataManagerImpl() { public static OmMetadataManagerImpl createCheckpointMetadataManager( OzoneConfiguration conf, DBCheckpoint checkpoint) throws IOException { + return createCheckpointMetadataManager(conf, checkpoint, true); + } + + public static OmMetadataManagerImpl createCheckpointMetadataManager( + OzoneConfiguration conf, DBCheckpoint checkpoint, boolean readOnly) throws IOException { Path path = checkpoint.getCheckpointLocation(); Path parent = path.getParent(); if (parent == null) { @@ -246,7 +251,11 @@ public static OmMetadataManagerImpl createCheckpointMetadataManager( throw new IllegalStateException("DB checkpoint dir name should not " + "have been null. Checkpoint path is " + path); } - return new OmMetadataManagerImpl(conf, dir, name.toString()); + return new OmMetadataManagerImpl(conf, dir, name.toString(), readOnly); + } + + protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) throws IOException { + this(conf, dir, name, true); } /** @@ -257,7 +266,7 @@ public static OmMetadataManagerImpl createCheckpointMetadataManager( * @param name - Checkpoint directory name. * @throws IOException */ - protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) + protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name, boolean readOnly) throws IOException { lock = new OmReadOnlyLock(); hierarchicalLockManager = new ReadOnlyHierarchicalResourceLockManager(); @@ -265,7 +274,7 @@ protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) int maxOpenFiles = conf.getInt(OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT); this.store = newDBStoreBuilder(conf, name, dir) - .setOpenReadOnly(true) + .setOpenReadOnly(readOnly) .disableDefaultCFAutoCompaction(true) .setMaxNumberOfOpenFiles(maxOpenFiles) .setEnableCompactionDag(false) @@ -519,6 +528,11 @@ public DBStore getStore() { return store; } + @Override + public Path getSnapshotParentDir() { + return Paths.get(store.getSnapshotsParentDir()); + } + /** * Given a volume return the corresponding DB key. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index c3b9feae77fe..a1c7aa918fa0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om; +import static org.apache.commons.io.file.PathUtils.deleteDirectory; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString; import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME; @@ -97,6 +98,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; import org.apache.hadoop.ozone.om.snapshot.SnapshotCache; @@ -512,6 +514,64 @@ public void invalidateCacheEntry(UUID key) { } } + /** + * Deletes the snapshot checkpoint directories for a given snapshot ID up to + * the specified maximum version. This method ensures that all directories + * containing checkpoint data for the specified snapshot ID up to the max + * version are removed in a controlled manner. + * + * @param snapshotId The unique identifier of the snapshot whose checkpoint + * directories are to be deleted. + * @param maxVersion The maximum version of checkpoint directories to delete. + * If a value less than 0 is provided, it defaults to the + * current maximum version of the snapshot. + * @throws IOException If there is a failure acquiring the snapshot database + * lock or while deleting directories. + * @throws IllegalArgumentException If the specified maxVersion is greater + * than the current maximum version of the + * snapshot. + */ + public void deleteSnapshotCheckpointDirectories(UUID snapshotId, int maxVersion) throws IOException { + // Acquire Snapshot DBHandle lock before removing the older version to ensure all readers are done with the + // snapshot db use. + try (UncheckedAutoCloseableSupplier lock = getSnapshotCache().lock(snapshotId)) { + if (!lock.get().isLockAcquired()) { + throw new IOException("Failed to acquire dbHandlelock on snapshot: " + snapshotId); + } + try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataMetaProvider snapMetaProvider = + snapshotLocalDataManager.getOmSnapshotLocalDataMeta(snapshotId)) { + if (maxVersion < 0) { + maxVersion = snapMetaProvider.getMeta().getVersion(); + } + if (maxVersion > snapMetaProvider.getMeta().getVersion()) { + throw new IllegalArgumentException( + String.format("Max Version to be deleted can never be greater than the existing " + + "version of the snapshot. Argument passed : %d and snapshotMaxVersion : %d", maxVersion, + snapMetaProvider.getMeta().getVersion())); + } + // Binary search the smallest existing version and delete the older versions starting from the smallest version. + // This is to ensure efficient crash recovery. + int smallestExistingVersion = 0; + int largestExistingVersion = maxVersion; + while (smallestExistingVersion <= largestExistingVersion) { + int midVersion = smallestExistingVersion + (largestExistingVersion - smallestExistingVersion) / 2; + Path path = OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId, midVersion); + if (path.toFile().exists()) { + largestExistingVersion = midVersion - 1; + } else { + smallestExistingVersion = midVersion + 1; + } + } + // Delete the older version directories. Always starting deletes from smallest version to largest version to + // ensure binary search works correctly on a later basis. + for (int version = smallestExistingVersion; version <= maxVersion; version++) { + Path path = OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId, version); + deleteDirectory(path); + } + } + } + } + /** * Creates snapshot checkpoint that corresponds to snapshotInfo. * @param omMetadataManager the metadata manager diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java index 3bc8a8dc27bf..852a7a637234 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java @@ -18,22 +18,18 @@ package org.apache.hadoop.ozone.om.response.snapshot; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE; -import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; import java.io.IOException; -import java.nio.file.Path; import java.util.List; import java.util.Map; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.lock.OMLockDetails; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; @@ -95,10 +91,9 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, if (snapshotInfo == null) { continue; } - + OmSnapshotManager omSnapshotManager = metadataManager.getOzoneManager().getOmSnapshotManager(); // Remove and close snapshot's RocksDB instance from SnapshotCache. - ((OmMetadataManagerImpl) omMetadataManager).getOzoneManager().getOmSnapshotManager() - .invalidateCacheEntry(snapshotInfo.getSnapshotId()); + omSnapshotManager.invalidateCacheEntry(snapshotInfo.getSnapshotId()); // Remove the snapshot from snapshotId to snapshotTableKey map. ((OmMetadataManagerImpl) omMetadataManager).getSnapshotChainManager() .removeFromSnapshotIdToTable(snapshotInfo.getSnapshotId()); @@ -109,7 +104,8 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, // snapshot purged txn is flushed to rocksdb. updateLocalData(snapshotLocalDataManager, snapshotInfo); // Delete Snapshot checkpoint directory. - deleteCheckpointDirectory(snapshotLocalDataManager, omMetadataManager, snapshotInfo); + + omSnapshotManager.deleteSnapshotCheckpointDirectories(snapshotInfo.getSnapshotId(), -1); // Delete snapshotInfo from the table. omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation, dbKey); } @@ -133,34 +129,6 @@ private void updateLocalData(OmSnapshotLocalDataManager localDataManager, Snapsh } } - /** - * Deletes the checkpoint directory for a snapshot. - */ - private void deleteCheckpointDirectory(OmSnapshotLocalDataManager snapshotLocalDataManager, - OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) throws IOException { - // Acquiring write lock to avoid race condition with sst filtering service which creates a sst filtered file - // inside the snapshot directory. Any operation apart which doesn't create/delete files under this snapshot - // directory can run in parallel along with this operation. - OMLockDetails omLockDetails = omMetadataManager.getLock() - .acquireWriteLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString()); - boolean acquiredSnapshotLock = omLockDetails.isLockAcquired(); - if (acquiredSnapshotLock) { - try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataMetaProvider snapMetaProvider = - snapshotLocalDataManager.getOmSnapshotLocalDataMeta(snapshotInfo)) { - Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo, - snapMetaProvider.getMeta().getVersion()); - try { - FileUtils.deleteDirectory(snapshotDirPath.toFile()); - } catch (IOException ex) { - LOG.error("Failed to delete snapshot directory {} for snapshot {}", - snapshotDirPath, snapshotInfo.getTableKey(), ex); - } finally { - omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString()); - } - } - } - } - @VisibleForTesting public Map getUpdatedSnapInfos() { return updatedSnapInfos; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java index 3868436eb518..0699fdd39dbf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java @@ -291,7 +291,10 @@ public void release(UUID key) { */ public UncheckedAutoCloseableSupplier lock() { return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK), - () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> cleanup(true)); + () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> { + cleanup(true); + return dbMap.isEmpty(); + }); } /** @@ -303,7 +306,10 @@ public UncheckedAutoCloseableSupplier lock() { public UncheckedAutoCloseableSupplier lock(UUID snapshotId) { return lock(() -> lock.acquireWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()), () -> lock.releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()), - () -> cleanup(snapshotId)); + () -> { + cleanup(snapshotId, false); + return !dbMap.containsKey(snapshotId); + }); } private OMLockDetails getEmptyOmLockDetails(OMLockDetails lockDetails) { @@ -311,14 +317,13 @@ private OMLockDetails getEmptyOmLockDetails(OMLockDetails lockDetails) { } private UncheckedAutoCloseableSupplier lock(Supplier lockFunction, - Supplier unlockFunction, Supplier cleanupFunction) { + Supplier unlockFunction, Supplier cleanupFunction) { Supplier emptyLockFunction = () -> getEmptyOmLockDetails(lockFunction.get()); Supplier emptyUnlockFunction = () -> getEmptyOmLockDetails(unlockFunction.get()); AtomicReference lockDetails = new AtomicReference<>(emptyLockFunction.get()); if (lockDetails.get().isLockAcquired()) { - cleanupFunction.get(); - if (!dbMap.isEmpty()) { + if (!cleanupFunction.get()) { lockDetails.set(emptyUnlockFunction.get()); } } @@ -349,14 +354,19 @@ public OMLockDetails get() { private synchronized Void cleanup(boolean force) { if (force || dbMap.size() > cacheSizeLimit) { for (UUID evictionKey : pendingEvictionQueue) { - cleanup(evictionKey); + cleanup(evictionKey, true); } } return null; } - private synchronized Void cleanup(UUID evictionKey) { + private synchronized Void cleanup(UUID evictionKey, boolean expectKeyToBePresent) { ReferenceCounted snapshot = dbMap.get(evictionKey); + + if (!expectKeyToBePresent && snapshot == null) { + return null; + } + if (snapshot != null && snapshot.getTotalRefCount() == 0) { try { compactSnapshotDB(snapshot.get()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java index 9be4a0d33899..c54f405e242c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java @@ -17,42 +17,80 @@ package org.apache.hadoop.ozone.om.snapshot.defrag; +import static java.nio.file.Files.createDirectories; +import static org.apache.commons.io.file.PathUtils.deleteDirectory; +import static org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT; -import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_GC_LOCK; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT; +import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_CONTENT_LOCK; +import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; -import java.util.Collections; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.hdds.utils.db.ByteArrayCodec; +import org.apache.hadoop.hdds.utils.db.CodecBuffer; +import org.apache.hadoop.hdds.utils.db.CodecException; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.TablePrefixInfo; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; +import org.apache.hadoop.ozone.om.OmSnapshotLocalData; import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; -import org.apache.hadoop.ozone.om.SstFilteringService; -import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider; +import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; +import org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer; +import org.apache.hadoop.ozone.om.snapshot.util.TableMergeIterator; import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.apache.logging.log4j.util.Strings; +import org.apache.ozone.rocksdb.util.SstFileInfo; +import org.apache.ozone.rocksdb.util.SstFileSetReader; +import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,16 +124,24 @@ public class SnapshotDefragService extends BackgroundService private final AtomicLong snapshotsDefraggedCount; private final AtomicBoolean running; - private final MultiSnapshotLocks snapshotIdLocks; + private final MultiSnapshotLocks snapshotContentLocks; private final OzoneConfiguration conf; private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); + private final String tmpDefragDir; + private final OmSnapshotManager omSnapshotManager; + private final OmSnapshotLocalDataManager snapshotLocalDataManager; + private final List lockIds; + private final CompositeDeltaDiffComputer deltaDiffComputer; + private final Path differTmpDir; public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, - OzoneManager ozoneManager, OzoneConfiguration configuration) { + OzoneManager ozoneManager, OzoneConfiguration configuration) throws IOException { super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, serviceTimeout, ozoneManager.getThreadNamePrefix()); this.ozoneManager = ozoneManager; + this.omSnapshotManager = ozoneManager.getOmSnapshotManager(); + this.snapshotLocalDataManager = omSnapshotManager.getSnapshotLocalDataManager(); this.snapshotLimitPerTask = configuration .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT); @@ -103,7 +149,22 @@ public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, snapshotsDefraggedCount = new AtomicLong(0); running = new AtomicBoolean(false); IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock(); - this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, true, 1); + this.snapshotContentLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_DB_CONTENT_LOCK, true, 1); + Path tmpDefragDirPath = ozoneManager.getMetadataManager().getSnapshotParentDir().toAbsolutePath() + .resolve("tmp_defrag"); + // Delete and recreate tmp dir if it exists + if (tmpDefragDirPath.toFile().exists()) { + deleteDirectory(tmpDefragDirPath); + } + createDirectories(tmpDefragDirPath); + this.tmpDefragDir = tmpDefragDirPath.toString(); + this.differTmpDir = tmpDefragDirPath.resolve("differSstFiles"); + + this.deltaDiffComputer = new CompositeDeltaDiffComputer(omSnapshotManager, + ozoneManager.getMetadataManager(), differTmpDir, (status) -> { + LOG.debug("Snapshot defragmentation diff status: {}", status); + }, false, !isRocksToolsNativeLibAvailable()); + this.lockIds = new ArrayList<>(1); } @Override @@ -135,48 +196,318 @@ private boolean isRocksToolsNativeLibAvailable() { } /** - * Checks if a snapshot needs defragmentation by examining its YAML metadata. + * Determines whether the specified snapshot requires defragmentation and returns + * a pair indicating the need for defragmentation and the corresponding version of the snapshot. + * + * @param snapshotInfo Information about the snapshot to be checked for defragmentation. + * @return A pair containing a boolean value and an integer: + * - The boolean value indicates whether the snapshot requires defragmentation + * (true if needed, false otherwise). + * - The integer represents the version of the snapshot being evaluated. + * @throws IOException If an I/O error occurs while accessing the local snapshot data or metadata. */ - private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { - if (!SstFilteringService.isSstFiltered(conf, snapshotInfo)) { - return false; - } - try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider readableOmSnapshotLocalDataProvider = - ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo)) { - Path snapshotPath = OmSnapshotManager.getSnapshotPath( - ozoneManager.getMetadataManager(), snapshotInfo, - readableOmSnapshotLocalDataProvider.getSnapshotLocalData().getVersion()); + @VisibleForTesting + Pair needsDefragmentation(SnapshotInfo snapshotInfo) throws IOException { + // Update snapshot local metadata to point to the correct previous snapshotId if it was different and check if + // snapshot needs defrag. + try (WritableOmSnapshotLocalDataProvider writableOmSnapshotLocalDataProvider = + snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) { // Read snapshot local metadata from YAML // Check if snapshot needs compaction (defragmentation) - boolean needsDefrag = readableOmSnapshotLocalDataProvider.needsDefrag(); - LOG.debug("Snapshot {} needsDefragmentation field value: {}", snapshotInfo.getName(), needsDefrag); + writableOmSnapshotLocalDataProvider.commit(); + boolean needsDefrag = writableOmSnapshotLocalDataProvider.needsDefrag(); + OmSnapshotLocalData localData = writableOmSnapshotLocalDataProvider.getSnapshotLocalData(); + if (!needsDefrag) { + OmSnapshotLocalData previousLocalData = writableOmSnapshotLocalDataProvider.getPreviousSnapshotLocalData(); + LOG.debug("Skipping defragmentation since snapshot has already been defragmented: id : {}(version: {}=>{}) " + + "previousId: {}(version: {})", snapshotInfo.getSnapshotId(), localData.getVersion(), + localData.getVersionSstFileInfos().get(localData.getVersion()).getPreviousSnapshotVersion(), + snapshotInfo.getPathPreviousSnapshotId(), previousLocalData.getVersion()); + } else { + LOG.debug("Snapshot {} needsDefragmentation field value: true", snapshotInfo.getSnapshotId()); + } + return Pair.of(needsDefrag, localData.getVersion()); + } + } - return needsDefrag; - } catch (IOException e) { - LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed", - snapshotInfo.getName(), e); - return true; + private Pair getTableBounds(Table table) throws RocksDatabaseException, CodecException { + String tableLowestValue = null, tableHighestValue = null; + try (TableIterator keyIterator = table.keyIterator()) { + if (keyIterator.hasNext()) { + // Setting the lowest value to the first key in the table. + tableLowestValue = keyIterator.next(); + } + keyIterator.seekToLast(); + if (keyIterator.hasNext()) { + // Setting the highest value to the last key in the table. + tableHighestValue = keyIterator.next(); + } } + return Pair.of(tableLowestValue, tableHighestValue); } /** - * Performs full defragmentation for the first snapshot in the chain. - * This is a simplified implementation that demonstrates the concept. + * Performs a full defragmentation process for specified tables in the metadata manager. + * This method processes all the entries in the tables for the provided prefix information, + * deletes specified key ranges, and compacts the tables to remove tombstones. + * + * @param checkpointDBStore the metadata manager responsible for managing tables during the checkpoint process + * @param prefixInfo the prefix information used to identify bucket prefix and determine key ranges in the tables + * @param incrementalTables the set of tables for which incremental defragmentation is performed. + * @throws IOException if an I/O error occurs during table operations or compaction */ - private void performFullDefragmentation(SnapshotInfo snapshotInfo, - OmSnapshot omSnapshot) throws IOException { + @VisibleForTesting + void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo prefixInfo, + Set incrementalTables) throws IOException { + for (String table : incrementalTables) { + Table checkpointTable = checkpointDBStore.getTable(table, StringCodec.get(), + ByteArrayCodec.get()); + String tableBucketPrefix = prefixInfo.getTablePrefix(table); + String prefixUpperBound = getLexicographicallyHigherString(tableBucketPrefix); + + Pair tableBounds = getTableBounds(checkpointTable); + String tableLowestValue = tableBounds.getLeft(); + String tableHighestValue = tableBounds.getRight(); + + // If lowest value is not null and if the bucket prefix corresponding to the table is greater than lower then + // delete the range between lowest value and bucket prefix. + if (tableLowestValue != null && tableLowestValue.compareTo(tableBucketPrefix) < 0) { + checkpointTable.deleteRange(tableLowestValue, tableBucketPrefix); + } + // If highest value is not null and if the next higher lexicographical string of bucket prefix corresponding to + // the table is less than equal to the highest value then delete the range between bucket prefix + // and also the highest value. + if (tableHighestValue != null && tableHighestValue.compareTo(prefixUpperBound) >= 0) { + checkpointTable.deleteRange(prefixUpperBound, tableHighestValue); + checkpointTable.delete(tableHighestValue); + } + // Compact the table completely with kForce to get rid of tombstones. + try (ManagedCompactRangeOptions compactRangeOptions = new ManagedCompactRangeOptions()) { + compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce); + compactRangeOptions.setExclusiveManualCompaction(true); + checkpointDBStore.compactTable(table, compactRangeOptions); + } + } + } - // TODO: Implement full defragmentation + /** + * Spills table difference into an SST file based on the provided delta file paths, + * current snapshot table, previous snapshot table, and an optional table key prefix. + * + * The method reads the delta files and compares the records against the snapshot tables. + * Any differences, including tombstones (deleted entries), are written to a new SST file. + * + * @param deltaFilePaths the list of paths to the delta files to process + * @param snapshotTable the current snapshot table for comparison + * @param previousSnapshotTable the previous snapshot table for comparison + * @param tableKeyPrefix the prefix for filtering certain keys, or null if all keys are to be included + * @return a pair of the path of the created SST file containing the differences and a boolean + * indicating whether any delta entries were written (true if there are differences, false otherwise) + * @throws IOException if an I/O error occurs during processing + */ + @VisibleForTesting + Pair spillTableDiffIntoSstFile(List deltaFilePaths, Table snapshotTable, + Table previousSnapshotTable, String tableKeyPrefix) throws IOException { + String sstFileReaderUpperBound = null; + if (Strings.isNotEmpty(tableKeyPrefix)) { + sstFileReaderUpperBound = getLexicographicallyHigherString(tableKeyPrefix); + } + SstFileSetReader sstFileSetReader = new SstFileSetReader(deltaFilePaths); + Path fileToBeIngested = differTmpDir.resolve(snapshotTable.getName() + "-" + UUID.randomUUID() + + SST_FILE_EXTENSION); + int deltaEntriesCount = 0; + try (ClosableIterator keysToCheck = + sstFileSetReader.getKeyStreamWithTombstone(tableKeyPrefix, sstFileReaderUpperBound); + TableMergeIterator tableMergeIterator = new TableMergeIterator<>(keysToCheck, + tableKeyPrefix, snapshotTable, previousSnapshotTable); + RDBSstFileWriter rdbSstFileWriter = new RDBSstFileWriter(fileToBeIngested.toFile())) { + while (tableMergeIterator.hasNext()) { + Table.KeyValue> kvs = tableMergeIterator.next(); + // Check if the values are equal or if they are not equal then the value should be written to the + // delta sstFile. + if (!Arrays.equals(kvs.getValue().get(0), kvs.getValue().get(1))) { + try (CodecBuffer key = StringCodec.get().toHeapCodecBuffer(kvs.getKey())) { + byte[] keyArray = key.asReadOnlyByteBuffer().array(); + byte[] val = kvs.getValue().get(0); + // If the value is null then add a tombstone to the delta sstFile. + if (val == null) { + rdbSstFileWriter.delete(keyArray); + } else { + rdbSstFileWriter.put(keyArray, val); + } + } + deltaEntriesCount++; + } + } + } catch (RocksDBException e) { + throw new RocksDatabaseException("Error while reading sst files.", e); + } + // If there are no delta entries then delete the delta file. No need to ingest the file as a diff. + return Pair.of(fileToBeIngested, deltaEntriesCount != 0); } /** - * Performs incremental defragmentation using diff from previous defragmented snapshot. + * Performs an incremental defragmentation process, which involves determining + * and processing delta files between snapshots for metadata updates. The method + * computes the changes, manages file ingestion to the checkpoint metadata manager, + * and ensures that all delta files are deleted after processing. + * + * @param previousSnapshotInfo information about the previous snapshot. + * @param snapshotInfo information about the current snapshot for which + * incremental defragmentation is performed. + * @param snapshotVersion the version of the snapshot to be processed. + * @param checkpointStore the dbStore instance where data + * updates are ingested after being processed. + * @param bucketPrefixInfo table prefix information associated with buckets, + * used to determine bounds for processing keys. + * @param incrementalTables the set of tables for which incremental defragmentation is performed. + * @throws IOException if an I/O error occurs during processing. */ - private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot, - SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot) + @VisibleForTesting + void performIncrementalDefragmentation(SnapshotInfo previousSnapshotInfo, SnapshotInfo snapshotInfo, + int snapshotVersion, DBStore checkpointStore, TablePrefixInfo bucketPrefixInfo, Set incrementalTables) throws IOException { + // Map of delta files grouped on the basis of the tableName. + Collection> allTableDeltaFiles = this.deltaDiffComputer.getDeltaFiles( + previousSnapshotInfo, snapshotInfo, incrementalTables); + + Map> tableGroupedDeltaFiles = allTableDeltaFiles.stream() + .collect(Collectors.groupingBy(pair -> pair.getValue().getColumnFamily(), + Collectors.mapping(Pair::getKey, Collectors.toList()))); + + String volumeName = snapshotInfo.getVolumeName(); + String bucketName = snapshotInfo.getBucketName(); + + Set filesToBeDeleted = new HashSet<>(); + // All files computed as delta must be deleted irrespective of whether ingestion succeeded or not. + allTableDeltaFiles.forEach(pair -> filesToBeDeleted.add(pair.getKey())); + try (UncheckedAutoCloseableSupplier snapshot = + omSnapshotManager.getActiveSnapshot(volumeName, bucketName, snapshotInfo.getName()); + UncheckedAutoCloseableSupplier previousSnapshot = + omSnapshotManager.getActiveSnapshot(volumeName, bucketName, previousSnapshotInfo.getName())) { + for (Map.Entry> entry : tableGroupedDeltaFiles.entrySet()) { + String table = entry.getKey(); + List deltaFiles = entry.getValue(); + Path fileToBeIngested; + if (deltaFiles.size() == 1 && snapshotVersion > 0) { + // If there is only one delta file for the table and the snapshot version is also not 0 then the same delta + // file can reingested into the checkpointStore. + fileToBeIngested = deltaFiles.get(0); + } else { + Table snapshotTable = snapshot.get().getMetadataManager().getStore() + .getTable(table, StringCodec.get(), ByteArrayCodec.get()); + Table previousSnapshotTable = previousSnapshot.get().getMetadataManager().getStore() + .getTable(table, StringCodec.get(), ByteArrayCodec.get()); + String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table); + Pair spillResult = spillTableDiffIntoSstFile(deltaFiles, snapshotTable, + previousSnapshotTable, tableBucketPrefix); + fileToBeIngested = spillResult.getValue() ? spillResult.getLeft() : null; + filesToBeDeleted.add(spillResult.getLeft()); + } + if (fileToBeIngested != null) { + if (!fileToBeIngested.toFile().exists()) { + throw new IOException("Delta file does not exist: " + fileToBeIngested); + } + Table checkpointTable = checkpointStore.getTable(table); + checkpointTable.loadFromFile(fileToBeIngested.toFile()); + } + } + } finally { + for (Path path : filesToBeDeleted) { + if (path.toFile().exists()) { + if (!path.toFile().delete()) { + LOG.warn("Failed to delete file: {}", path); + } + } + } + } + } + + /** + * Ingests non-incremental tables from a snapshot into a checkpoint database store. + * This involves exporting table data from the snapshot to intermediate SST files + * and ingesting them into the corresponding tables in the checkpoint database store. + * Tables that are part of incremental defragmentation are excluded from this process. + * + * @param checkpointDBStore the database store where non-incremental tables are ingested. + * @param snapshotInfo the metadata information of the snapshot being processed. + * @param bucketPrefixInfo prefix information used for determining table prefixes. + * @param incrementalTables the set of tables identified for incremental defragmentation. + * @throws IOException if an I/O error occurs during table ingestion or file operations. + */ + @VisibleForTesting + void ingestNonIncrementalTables(DBStore checkpointDBStore, + SnapshotInfo snapshotInfo, TablePrefixInfo bucketPrefixInfo, Set incrementalTables) throws IOException { + String volumeName = snapshotInfo.getVolumeName(); + String bucketName = snapshotInfo.getBucketName(); + String snapshotName = snapshotInfo.getName(); + Set filesToBeDeleted = new HashSet<>(); + try (UncheckedAutoCloseableSupplier snapshot = omSnapshotManager.getActiveSnapshot(volumeName, + bucketName, snapshotName)) { + DBStore snapshotDBStore = snapshot.get().getMetadataManager().getStore(); + for (Table snapshotTable : snapshotDBStore.listTables()) { + String snapshotTableName = snapshotTable.getName(); + if (!incrementalTables.contains(snapshotTable.getName())) { + Path tmpSstFile = differTmpDir.resolve(snapshotTable.getName() + "-" + UUID.randomUUID() + + SST_FILE_EXTENSION); + filesToBeDeleted.add(tmpSstFile); + String prefix = bucketPrefixInfo.getTablePrefix(snapshotTableName); + byte[] prefixBytes = Strings.isBlank(prefix) ? null : StringCodec.get().toPersistedFormat(prefix); + Table snapshotTableBytes = snapshotDBStore.getTable(snapshotTableName, ByteArrayCodec.get(), + ByteArrayCodec.get()); + snapshotTableBytes.dumpToFileWithPrefix(tmpSstFile.toFile(), prefixBytes); + Table checkpointTable = checkpointDBStore.getTable(snapshotTableName, ByteArrayCodec.get(), + ByteArrayCodec.get()); + checkpointTable.loadFromFile(tmpSstFile.toFile()); + } + } + } finally { + for (Path path : filesToBeDeleted) { + if (path.toFile().exists()) { + if (!path.toFile().delete()) { + LOG.warn("Failed to delete file for ingesting non incremental table: {}", path); + } + } + } + } + } - // TODO: Implement incremental defragmentation + /** + * Atomically switches the current snapshot database to a new version derived + * from the provided checkpoint directory. This involves moving the checkpoint + * path to a versioned directory, updating the snapshot metadata, and committing + * the changes to persist the snapshot version update. + * + * @param snapshotId The UUID identifying the snapshot to update. + * @param checkpointPath The path to the checkpoint directory that serves as the basis + * for the updated snapshot version. + * @return The previous version number of the snapshot prior to the update. + * @throws IOException If an I/O error occurs during file operations, checkpoint processing, + * or snapshot metadata updates. + */ + @VisibleForTesting + int atomicSwitchSnapshotDB(UUID snapshotId, Path checkpointPath) throws IOException { + try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider = + snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)) { + OmSnapshotLocalData localData = snapshotLocalDataProvider.getSnapshotLocalData(); + Path nextVersionPath = OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId, + localData.getVersion() + 1); + // Remove the directory if it exists. + if (nextVersionPath.toFile().exists()) { + deleteDirectory(nextVersionPath); + } + // Move the checkpoint directory to the next version directory. + Files.move(checkpointPath, nextVersionPath); + RocksDBCheckpoint dbCheckpoint = new RocksDBCheckpoint(nextVersionPath); + // Add a new version to the local data file. + try (OmMetadataManagerImpl newVersionCheckpointMetadataManager = + OmMetadataManagerImpl.createCheckpointMetadataManager(conf, dbCheckpoint, true)) { + RDBStore newVersionCheckpointStore = (RDBStore) newVersionCheckpointMetadataManager.getStore(); + snapshotLocalDataProvider.addSnapshotVersion(newVersionCheckpointStore); + snapshotLocalDataProvider.commit(); + } + return localData.getVersion() - 1; + } } private final class SnapshotDefragTask implements BackgroundTask { @@ -192,6 +523,105 @@ public BackgroundTaskResult call() throws Exception { } } + /** + * Creates a new checkpoint by modifying the metadata manager from a snapshot. + * This involves generating a temporary checkpoint and truncating specified + * column families from the checkpoint before returning the updated metadata manager. + * + * @param snapshotInfo Information about the snapshot for which the checkpoint + * is being created. + * @param incrementalColumnFamilies A set of table names representing incremental + * column families to be retained in the checkpoint. + * @return A new instance of OmMetadataManagerImpl initialized with the modified + * checkpoint. + * @throws IOException If an I/O error occurs during snapshot processing, + * checkpoint creation, or table operations. + */ + @VisibleForTesting + OmMetadataManagerImpl createCheckpoint(SnapshotInfo snapshotInfo, + Set incrementalColumnFamilies) throws IOException { + try (UncheckedAutoCloseableSupplier snapshot = omSnapshotManager.getActiveSnapshot( + snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), snapshotInfo.getName())) { + DBCheckpoint checkpoint = snapshot.get().getMetadataManager().getStore().getCheckpoint(tmpDefragDir, true); + try (OmMetadataManagerImpl metadataManagerBeforeTruncate = + OmMetadataManagerImpl.createCheckpointMetadataManager(conf, checkpoint, false)) { + DBStore dbStore = metadataManagerBeforeTruncate.getStore(); + for (String table : metadataManagerBeforeTruncate.listTableNames()) { + if (!incrementalColumnFamilies.contains(table)) { + dbStore.dropTable(table); + } + } + } catch (Exception e) { + throw new IOException("Failed to close checkpoint of snapshot: " + snapshotInfo.getSnapshotId(), e); + } + // This will recreate the column families in the checkpoint. + return OmMetadataManagerImpl.createCheckpointMetadataManager(conf, checkpoint, false); + } + } + + private void acquireContentLock(UUID snapshotID) throws IOException { + lockIds.clear(); + lockIds.add(snapshotID); + OMLockDetails lockDetails = snapshotContentLocks.acquireLock(lockIds); + if (!lockDetails.isLockAcquired()) { + throw new IOException("Failed to acquire lock on snapshot: " + snapshotID); + } + LOG.debug("Acquired MultiSnapshotLocks on snapshot: {}", snapshotID); + } + + private boolean checkAndDefragSnapshot(SnapshotChainManager chainManager, UUID snapshotId) throws IOException { + SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, chainManager, snapshotId); + + if (snapshotInfo.getSnapshotStatus() != SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) { + LOG.debug("Skipping defragmentation for non-active snapshot: {} (ID: {})", + snapshotInfo.getName(), snapshotInfo.getSnapshotId()); + return false; + } + Pair needsDefragVersionPair = needsDefragmentation(snapshotInfo); + if (!needsDefragVersionPair.getLeft()) { + return false; + } + // Create a checkpoint of the previous snapshot or the current snapshot if it is the first snapshot in the chain. + SnapshotInfo checkpointSnapshotInfo = snapshotInfo.getPathPreviousSnapshotId() == null ? snapshotInfo : + SnapshotUtils.getSnapshotInfo(ozoneManager, chainManager, snapshotInfo.getPathPreviousSnapshotId()); + + OmMetadataManagerImpl checkpointMetadataManager = createCheckpoint(checkpointSnapshotInfo, + COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT); + Path checkpointLocation = checkpointMetadataManager.getStore().getDbLocation().toPath(); + try { + DBStore checkpointDBStore = checkpointMetadataManager.getStore(); + TablePrefixInfo prefixInfo = ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(), + snapshotInfo.getBucketName()); + // If first snapshot in the chain perform full defragmentation. + if (snapshotInfo.getPathPreviousSnapshotId() == null) { + performFullDefragmentation(checkpointDBStore, prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT); + } else { + performIncrementalDefragmentation(checkpointSnapshotInfo, snapshotInfo, needsDefragVersionPair.getValue(), + checkpointDBStore, prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT); + } + int previousVersion; + // Acquire Content lock on the snapshot to ensure the contents of the table doesn't get changed. + acquireContentLock(snapshotId); + try { + // Ingestion of incremental tables KeyTable/FileTable/DirectoryTable done now we need to just reingest the + // remaining tables from the original snapshot. + ingestNonIncrementalTables(checkpointDBStore, snapshotInfo, prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT); + checkpointMetadataManager.close(); + checkpointMetadataManager = null; + // Switch the snapshot DB location to the new version. + previousVersion = atomicSwitchSnapshotDB(snapshotId, checkpointLocation); + } finally { + snapshotContentLocks.releaseLock(); + } + omSnapshotManager.deleteSnapshotCheckpointDirectories(snapshotId, previousVersion); + } finally { + if (checkpointMetadataManager != null) { + checkpointMetadataManager.close(); + } + } + return true; + } + public synchronized boolean triggerSnapshotDefragOnce() throws IOException { final long count = runCount.incrementAndGet(); @@ -217,119 +647,26 @@ public synchronized boolean triggerSnapshotDefragOnce() throws IOException { final SnapshotChainManager snapshotChainManager = ((OmMetadataManagerImpl) ozoneManager.getMetadataManager()).getSnapshotChainManager(); - final Table snapshotInfoTable = - ozoneManager.getMetadataManager().getSnapshotInfoTable(); - // Use iterator(false) to iterate forward through the snapshot chain Iterator snapshotIterator = snapshotChainManager.iterator(false); long snapshotLimit = snapshotLimitPerTask; - while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) { - // Get SnapshotInfo for the current snapshot in the chain + // Get SnapshotInfo for the current snapshot in the chain. UUID snapshotId = snapshotIterator.next(); - String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId); - SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey); - if (snapshotToDefrag == null) { - LOG.warn("Snapshot with ID '{}' not found in snapshot info table", snapshotId); - continue; - } - - // Skip deleted snapshots - if (snapshotToDefrag.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) { - LOG.debug("Skipping deleted snapshot: {} (ID: {})", - snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); - continue; - } - - // Check if this snapshot needs defragmentation - if (!needsDefragmentation(snapshotToDefrag)) { - LOG.debug("Skipping already defragged snapshot: {} (ID: {})", - snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); - continue; - } - - LOG.info("Will defrag snapshot: {} (ID: {})", - snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); - - // Acquire MultiSnapshotLocks - if (!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId())) - .isLockAcquired()) { - LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})", - snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); - break; - } - - try { - LOG.info("Processing snapshot defragmentation for: {} (ID: {})", - snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); - - // Get snapshot through SnapshotCache for proper locking - try (UncheckedAutoCloseableSupplier snapshotSupplier = - snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) { - - OmSnapshot omSnapshot = snapshotSupplier.get(); - - UUID pathPreviousSnapshotId = snapshotToDefrag.getPathPreviousSnapshotId(); - boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null; - if (isFirstSnapshotInPath) { - LOG.info("Performing full defragmentation for first snapshot (in path): {}", - snapshotToDefrag.getName()); - performFullDefragmentation(snapshotToDefrag, omSnapshot); - } else { - final String psIdtableKey = snapshotChainManager.getTableKey(pathPreviousSnapshotId); - SnapshotInfo previousDefraggedSnapshot = snapshotInfoTable.get(psIdtableKey); - - LOG.info("Performing incremental defragmentation for snapshot: {} " + - "based on previous defragmented snapshot: {}", - snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); - - // If previous path snapshot is not null, it must have been defragmented already - // Sanity check to ensure previous snapshot exists and is defragmented - if (needsDefragmentation(previousDefraggedSnapshot)) { - LOG.error("Fatal error before defragging snapshot: {}. " + - "Previous snapshot in path {} was not defragged while it is expected to be.", - snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); - break; - } - - performIncrementalDefragmentation(snapshotToDefrag, - previousDefraggedSnapshot, omSnapshot); - } - - // TODO: Update snapshot metadata here? - - // Close and evict the original snapshot DB from SnapshotCache - // TODO: Implement proper eviction from SnapshotCache - LOG.info("Defragmentation completed for snapshot: {}", - snapshotToDefrag.getName()); - + try (UncheckedAutoCloseable lock = getBootstrapStateLock().acquireReadLock()) { + if (checkAndDefragSnapshot(snapshotChainManager, snapshotId)) { snapshotLimit--; snapshotsDefraggedCount.getAndIncrement(); - - } catch (OMException ome) { - if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) { - LOG.info("Snapshot {} was deleted during defragmentation", - snapshotToDefrag.getName()); - } else { - LOG.error("OMException during snapshot defragmentation for: {}", - snapshotToDefrag.getName(), ome); - } } - - } catch (Exception e) { - LOG.error("Exception during snapshot defragmentation for: {}", - snapshotToDefrag.getName(), e); + } catch (IOException e) { + LOG.error("Exception while defragmenting snapshot: {}", snapshotId, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Snapshot defragmentation task interrupted", e); return false; - } finally { - // Release lock MultiSnapshotLocks - snapshotIdLocks.releaseLock(); - LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})", - snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); - } } - return true; } @@ -371,6 +708,18 @@ public BootstrapStateHandler.Lock getBootstrapStateLock() { public void shutdown() { running.set(false); super.shutdown(); + try { + deltaDiffComputer.close(); + } catch (IOException e) { + LOG.error("Error while closing delta diff computer.", e); + } + Path tmpDirPath = Paths.get(tmpDefragDir); + if (tmpDirPath.toFile().exists()) { + try { + deleteDirectory(tmpDirPath); + } catch (IOException e) { + LOG.error("Failed to delete temporary directory: {}", tmpDirPath, e); + } + } } } - diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java new file mode 100644 index 000000000000..46fa02d21f34 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot.defrag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OmSnapshotLocalData; +import org.apache.hadoop.ozone.om.OmSnapshotManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider; +import org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; +import org.apache.hadoop.ozone.upgrade.LayoutFeature; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockitoAnnotations; + +/** + * Unit tests for SnapshotDefragService. + */ +public class TestSnapshotDefragService { + + @Mock + private OzoneManager ozoneManager; + + @Mock + private OmSnapshotManager omSnapshotManager; + + @Mock + private OmSnapshotLocalDataManager snapshotLocalDataManager; + + @Mock + private OmMetadataManagerImpl metadataManager; + + @Mock + private IOzoneManagerLock omLock; + + @Mock + private OMLayoutVersionManager versionManager; + + @TempDir + private Path tempDir; + private SnapshotDefragService defragService; + private AutoCloseable mocks; + + @BeforeEach + public void setup() throws IOException { + mocks = MockitoAnnotations.openMocks(this); + OzoneConfiguration configuration = new OzoneConfiguration(); + + // Setup basic mocks + when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); + when(ozoneManager.getMetadataManager()).thenReturn(metadataManager); + when(ozoneManager.getThreadNamePrefix()).thenReturn("TestOM"); + when(ozoneManager.isRunning()).thenReturn(true); + when(ozoneManager.getVersionManager()).thenReturn(versionManager); + when(ozoneManager.getOmRatisServer()).thenReturn(mock(OzoneManagerRatisServer.class)); + + when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(snapshotLocalDataManager); + when(metadataManager.getLock()).thenReturn(omLock); + when(metadataManager.getSnapshotParentDir()).thenReturn(tempDir); + when(versionManager.isAllowed(any(LayoutFeature.class))).thenReturn(true); + try (MockedConstruction compositeDeltaDiffComputer = + mockConstruction(CompositeDeltaDiffComputer.class)) { + // Initialize service + defragService = new SnapshotDefragService( + 10000, // interval + TimeUnit.MILLISECONDS, + 60000, // timeout + ozoneManager, + configuration + ); + assertEquals(1, compositeDeltaDiffComputer.constructed().size()); + } + + } + + @AfterEach + public void tearDown() throws Exception { + if (defragService != null) { + defragService.shutdown(); + } + if (mocks != null) { + mocks.close(); + } + } + + @Test + public void testServiceStartAndPause() { + defragService.start(); + assertTrue(defragService.getSnapshotsDefraggedCount().get() >= 0); + + defragService.pause(); + assertNotNull(defragService); + + defragService.resume(); + assertNotNull(defragService); + } + + @Test + public void testNeedsDefragmentationAlreadyDefragmented() throws IOException { + UUID snapshotId = UUID.randomUUID(); + SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, "vol1", "bucket1", "snap1"); + + WritableOmSnapshotLocalDataProvider provider = mock(WritableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class); + OmSnapshotLocalData previousLocalData = mock(OmSnapshotLocalData.class); + + when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)).thenReturn(provider); + when(provider.needsDefrag()).thenReturn(false); + when(provider.getSnapshotLocalData()).thenReturn(localData); + when(provider.getPreviousSnapshotLocalData()).thenReturn(previousLocalData); + when(localData.getVersion()).thenReturn(1); + when(previousLocalData.getVersion()).thenReturn(0); + + + OmSnapshotLocalData.VersionMeta versionInfo = mock(OmSnapshotLocalData.VersionMeta.class); + when(versionInfo.getPreviousSnapshotVersion()).thenReturn(0); + Map versionMap = ImmutableMap.of(1, versionInfo); + when(localData.getVersionSstFileInfos()).thenReturn(versionMap); + + Pair result = defragService.needsDefragmentation(snapshotInfo); + + assertFalse(result.getLeft()); + assertEquals(1, result.getRight()); + verify(provider).commit(); + verify(provider).close(); + } + + @Test + public void testNeedsDefragmentationRequiresDefrag() throws IOException { + UUID snapshotId = UUID.randomUUID(); + SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, "vol1", "bucket1", "snap1"); + + WritableOmSnapshotLocalDataProvider provider = mock(WritableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class); + AtomicInteger commit = new AtomicInteger(0); + when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)).thenReturn(provider); + when(provider.getSnapshotLocalData()).thenReturn(localData); + doAnswer(invocationOnMock -> { + commit.incrementAndGet(); + return null; + }).when(provider).commit(); + when(provider.needsDefrag()).thenAnswer(i -> commit.get() == 1); + int version = ThreadLocalRandom.current().nextInt(100); + when(localData.getVersion()).thenReturn(version); + + Pair result = defragService.needsDefragmentation(snapshotInfo); + + assertTrue(result.getLeft()); + assertEquals(version, result.getRight()); + verify(provider).close(); + } + + /** + * Helper method to create a mock SnapshotInfo. + */ + private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, String volume, + String bucket, String name) { + SnapshotInfo.Builder builder = SnapshotInfo.newBuilder(); + builder.setSnapshotId(snapshotId); + builder.setVolumeName(volume); + builder.setBucketName(bucket); + builder.setName(name); + builder.setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE); + builder.setCreationTime(System.currentTimeMillis()); + return builder.build(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java index 19579a59e16e..6ff777656188 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java @@ -527,3 +527,5 @@ private OmSnapshotLocalData createMockSnapshotLocalDataWithVersions(UUID snapsho return localData; } } + +