diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 1d47fb72958f..db66fed22fe9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -277,6 +277,15 @@ public final class OzoneConfigKeys { OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s"; // 300s for default + public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT = + "ozone.snapshot.defrag.service.timeout"; + public static final String + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s"; + // TODO: Adjust timeout as needed. + // One concern would be that snapdiff can take a long time. + // If snapdiff wait time is included in the timeout it can make it indeterministic. + // -- So don't wait? Trigger and check later? + public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL = "ozone.snapshot.deleting.service.interval"; public static final String diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index c9064da1781c..cb4490c2c1db 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -520,6 +520,7 @@ public final class OzoneConsts { public static final String OM_SNAPSHOT_DIR = "db.snapshots"; public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "checkpointState"; + public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR = "checkpointStateDefragged"; public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR + OM_KEY_PREFIX + "diffState"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 02f1e71bfcf7..b200c0b5bf1f 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3758,6 +3758,14 @@ Snapshot Deleting Service per run. + + ozone.snapshot.defrag.limit.per.task + 1 + OZONE, PERFORMANCE, OM + The maximum number of snapshots that would be defragmented in + each task run of snapshot defragmentation service. + + ozone.snapshot.filtering.service.interval 1m @@ -3765,6 +3773,13 @@ Time interval of the SST File filtering service from Snapshot. + + ozone.snapshot.defrag.service.interval + -1 + OZONE, PERFORMANCE, OM + Task interval of snapshot defragmentation service. + + ozone.om.snapshot.checkpoint.dir.creation.poll.timeout 20s @@ -3781,6 +3796,13 @@ A timeout value of sst filtering service. + + ozone.snapshot.defrag.service.timeout + 300s + OZONE, PERFORMANCE,OM + Timeout value of a run of snapshot defragmentation service. + + ozone.filesystem.snapshot.enabled diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java index e84854cae443..5aa561ba9486 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java @@ -28,7 +28,7 @@ /** * DumpFileWriter using rocksdb sst files. */ -class RDBSstFileWriter implements Closeable { +public class RDBSstFileWriter implements Closeable { private ManagedSstFileWriter sstFileWriter; private File sstFile; @@ -36,7 +36,7 @@ class RDBSstFileWriter implements Closeable { private ManagedOptions emptyOption = new ManagedOptions(); private final ManagedEnvOptions emptyEnvOptions = new ManagedEnvOptions(); - RDBSstFileWriter(File externalFile) throws RocksDatabaseException { + public RDBSstFileWriter(File externalFile) throws RocksDatabaseException { this.sstFileWriter = new ManagedSstFileWriter(emptyEnvOptions, emptyOption); this.keyCounter = new AtomicLong(0); this.sstFile = externalFile; @@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws RocksDatabaseException { } } + public void delete(byte[] key) throws RocksDatabaseException { + try { + sstFileWriter.delete(key); + keyCounter.incrementAndGet(); + } catch (RocksDBException e) { + closeOnFailure(); + throw new RocksDatabaseException("Failed to delete key (length=" + key.length + + "), sstFile=" + sstFile.getAbsolutePath(), e); + } + } + @Override public void close() throws RocksDatabaseException { if (sstFileWriter != null) { diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 87bc9cb30170..969288ed92c8 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -428,11 +428,22 @@ public final class OMConfigKeys { "ozone.snapshot.deleting.limit.per.task"; public static final int SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT = 10; + // Snapshot defragmentation service configuration + public static final String SNAPSHOT_DEFRAG_LIMIT_PER_TASK = + "ozone.snapshot.defrag.limit.per.task"; + public static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT = 1; + public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL = "ozone.snapshot.filtering.service.interval"; public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s"; + public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL = + "ozone.snapshot.defrag.service.interval"; + public static final String + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "-1"; + // TODO: Disabled by default. Do not enable by default until upgrade handling is complete. + public static final String OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT = "ozone.om.snapshot.checkpoint.dir.creation.poll.timeout"; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 7e76885c49bd..872a99e94b15 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -346,6 +346,12 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId, */ SstFilteringService getSnapshotSstFilteringService(); + /** + * Returns the instance of Snapshot Defrag service. + * @return Background service. + */ + SnapshotDefragService getSnapshotDefragService(); + /** * Returns the instance of Snapshot Deleting service. * @return Background service. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 02949d5ee742..e458fa73236a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT; @@ -58,6 +60,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; @@ -202,6 +206,7 @@ public class KeyManagerImpl implements KeyManager { private KeyDeletingService keyDeletingService; private SstFilteringService snapshotSstFilteringService; + private SnapshotDefragService snapshotDefragService; private SnapshotDeletingService snapshotDeletingService; private final KeyProviderCryptoExtension kmsProvider; @@ -310,6 +315,11 @@ public void start(OzoneConfiguration configuration) { startSnapshotSstFilteringService(configuration); } + if (snapshotDefragService == null && + ozoneManager.isFilesystemSnapshotEnabled()) { + startSnapshotDefragService(configuration); + } + if (snapshotDeletingService == null && ozoneManager.isFilesystemSnapshotEnabled()) { @@ -393,6 +403,42 @@ public void stopSnapshotSstFilteringService() { } } + /** + * Start the snapshot defrag service if interval is not set to disabled value. + * @param conf + */ + public void startSnapshotDefragService(OzoneConfiguration conf) { + if (isDefragSvcEnabled()) { + long serviceInterval = conf.getTimeDuration( + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = conf.getTimeDuration( + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, + OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + snapshotDefragService = + new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS, + serviceTimeout, ozoneManager, conf); + snapshotDefragService.start(); + } else { + LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation will not run periodically."); + } + } + + /** + * Stop the snapshot defrag service if it is running. + */ + public void stopSnapshotDefragService() { + if (snapshotDefragService != null) { + snapshotDefragService.shutdown(); + snapshotDefragService = null; + } else { + LOG.info("SnapshotDefragService is already stopped or not started."); + } + } + private void startCompactionService(OzoneConfiguration configuration, boolean isCompactionServiceEnabled) { if (compactionService == null && isCompactionServiceEnabled) { @@ -419,7 +465,7 @@ KeyProviderCryptoExtension getKMSProvider() { } @Override - public void stop() throws IOException { + public void stop() { if (keyDeletingService != null) { keyDeletingService.shutdown(); keyDeletingService = null; @@ -436,6 +482,10 @@ public void stop() throws IOException { snapshotSstFilteringService.shutdown(); snapshotSstFilteringService = null; } + if (snapshotDefragService != null) { + snapshotDefragService.shutdown(); + snapshotDefragService = null; + } if (snapshotDeletingService != null) { snapshotDeletingService.shutdown(); snapshotDeletingService = null; @@ -450,6 +500,16 @@ public void stop() throws IOException { } } + /** + * Get the SnapshotDefragService instance. + * + * @return SnapshotDefragService instance, or null if not initialized + */ + @Override + public SnapshotDefragService getSnapshotDefragService() { + return snapshotDefragService; + } + private OmBucketInfo getBucketInfo(String volumeName, String bucketName) throws IOException { String bucketKey = metadataManager.getBucketKey(volumeName, bucketName); @@ -973,7 +1033,16 @@ public boolean isSstFilteringSvcEnabled() { // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay return serviceInterval > 0; } - + + public boolean isDefragSvcEnabled() { + long serviceInterval = ozoneManager.getConfiguration() + .getTimeDuration(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, + OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay + return serviceInterval > 0; + } + @Override public OmMultipartUploadList listMultipartUploads(String volumeName, String bucketName, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java index 543c4c6397cc..c376e9a332c0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java @@ -164,10 +164,11 @@ private final class ConstructSnapshotLocalData extends AbstractConstruct { public Object construct(Node node) { MappingNode mnode = (MappingNode) node; Map nodes = constructMapping(mnode); - UUID snapId = UUID.fromString((String) nodes.get(OzoneConsts.OM_SLD_SNAP_ID)); - UUID prevSnapId = UUID.fromString((String) nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID)); - OmSnapshotLocalData snapshotLocalData = new OmSnapshotLocalData(snapId, Collections.emptyList(), - prevSnapId); + final String snapIdStr = (String) nodes.get(OzoneConsts.OM_SLD_SNAP_ID); + UUID snapId = UUID.fromString(snapIdStr); + final String prevSnapIdStr = (String) nodes.get(OzoneConsts.OM_SLD_PREV_SNAP_ID); + UUID prevSnapId = prevSnapIdStr != null ? UUID.fromString(prevSnapIdStr) : null; + OmSnapshotLocalData snapshotLocalData = new OmSnapshotLocalData(snapId, Collections.emptyList(), prevSnapId); // Set version from YAML Integer version = (Integer) nodes.get(OzoneConsts.OM_SLD_VERSION); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java new file mode 100644 index 000000000000..436593b861b6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background service for defragmenting snapshots in the active snapshot chain. + * When snapshots are taken, they capture the entire OM RocksDB state but may contain + * fragmented data. This service defragments snapshots by creating new compacted + * RocksDB instances with only the necessary data for tracked column families. + * + * The service processes snapshots in the active chain sequentially, starting with + * the first non-defragmented snapshot. For the first snapshot in the chain, it + * performs a full defragmentation by copying all keys. For subsequent snapshots, + * it uses incremental defragmentation based on diffs from the previous defragmented + * snapshot. + */ +public class SnapshotDefragService extends BackgroundService + implements BootstrapStateHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SnapshotDefragService.class); + + // Use only a single thread for snapshot defragmentation to avoid conflicts + private static final int DEFRAG_CORE_POOL_SIZE = 1; + + private final OzoneManager ozoneManager; + private final AtomicLong runCount = new AtomicLong(0); + + // Number of snapshots to be processed in a single iteration + private final long snapshotLimitPerTask; + + private final AtomicLong snapshotsDefraggedCount; + private final AtomicBoolean running; + + private final MultiSnapshotLocks snapshotIdLocks; + + private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); + + public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, + OzoneManager ozoneManager, OzoneConfiguration configuration) { + super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, + serviceTimeout, ozoneManager.getThreadNamePrefix()); + this.ozoneManager = ozoneManager; + this.snapshotLimitPerTask = configuration + .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, + SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT); + snapshotsDefraggedCount = new AtomicLong(0); + running = new AtomicBoolean(false); + IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock(); + this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, true); + } + + @Override + public void start() { + running.set(true); + super.start(); + } + + @VisibleForTesting + public void pause() { + running.set(false); + } + + @VisibleForTesting + public void resume() { + running.set(true); + } + + /** + * Checks if rocks-tools native library is available. + */ + private boolean isRocksToolsNativeLibAvailable() { + try { + return ManagedRawSSTFileReader.tryLoadLibrary(); + } catch (Exception e) { + LOG.warn("Failed to check native code availability", e); + return false; + } + } + + /** + * Checks if a snapshot needs defragmentation by examining its YAML metadata. + */ + private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + try { + // Read snapshot local metadata from YAML + OmSnapshotLocalData snapshotLocalData = ozoneManager.getOmSnapshotManager() + .getSnapshotLocalDataManager() + .getOmSnapshotLocalData(snapshotInfo); + + // Check if snapshot needs compaction (defragmentation) + boolean needsDefrag = snapshotLocalData.getNeedsDefrag(); + LOG.debug("Snapshot {} needsDefragmentation field value: {}", + snapshotInfo.getName(), needsDefrag); + + return needsDefrag; + } catch (IOException e) { + LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed", + snapshotInfo.getName(), e); + return true; + } + } + + /** + * Performs full defragmentation for the first snapshot in the chain. + * This is a simplified implementation that demonstrates the concept. + */ + private void performFullDefragmentation(SnapshotInfo snapshotInfo, + OmSnapshot omSnapshot) throws IOException { + + // TODO: Implement full defragmentation + } + + /** + * Performs incremental defragmentation using diff from previous defragmented snapshot. + */ + private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot, + SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot) + throws IOException { + + // TODO: Implement incremental defragmentation + } + + private final class SnapshotDefragTask implements BackgroundTask { + + @Override + public BackgroundTaskResult call() throws Exception { + // Check OM leader and readiness + if (shouldRun()) { + final long count = runCount.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating Snapshot Defragmentation Task: run # {}", count); + } + triggerSnapshotDefragOnce(); + } + + return EmptyTaskResult.newResult(); + } + } + + public synchronized boolean triggerSnapshotDefragOnce() throws IOException { + // Check if rocks-tools native lib is available + if (!isRocksToolsNativeLibAvailable()) { + LOG.warn("Rocks-tools native library is not available. " + + "Stopping SnapshotDefragService."); + return false; + } + + Optional snapshotManager = Optional.ofNullable(ozoneManager) + .map(OzoneManager::getOmSnapshotManager); + if (!snapshotManager.isPresent()) { + LOG.debug("OmSnapshotManager not available, skipping defragmentation task"); + return false; + } + + // Get the SnapshotChainManager to iterate through the global snapshot chain + final SnapshotChainManager snapshotChainManager = + ((OmMetadataManagerImpl) ozoneManager.getMetadataManager()).getSnapshotChainManager(); + + final Table snapshotInfoTable = + ozoneManager.getMetadataManager().getSnapshotInfoTable(); + + // Use iterator(false) to iterate forward through the snapshot chain + Iterator snapshotIterator = snapshotChainManager.iterator(false); + + long snapshotLimit = snapshotLimitPerTask; + + while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) { + // Get SnapshotInfo for the current snapshot in the chain + UUID snapshotId = snapshotIterator.next(); + String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId); + SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey); + if (snapshotToDefrag == null) { + LOG.warn("Snapshot with ID '{}' not found in snapshot info table", snapshotId); + continue; + } + + // Skip deleted snapshots + if (snapshotToDefrag.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) { + LOG.debug("Skipping deleted snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + continue; + } + + // Check if this snapshot needs defragmentation + if (!needsDefragmentation(snapshotToDefrag)) { + LOG.debug("Skipping already defragged snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + continue; + } + + LOG.info("Will defrag snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + // Acquire MultiSnapshotLocks + if (!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId())) + .isLockAcquired()) { + LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + break; + } + + try { + LOG.info("Processing snapshot defragmentation for: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + // Get snapshot through SnapshotCache for proper locking + try (UncheckedAutoCloseableSupplier snapshotSupplier = + snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) { + + OmSnapshot omSnapshot = snapshotSupplier.get(); + + UUID pathPreviousSnapshotId = snapshotToDefrag.getPathPreviousSnapshotId(); + boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null; + if (isFirstSnapshotInPath) { + LOG.info("Performing full defragmentation for first snapshot (in path): {}", + snapshotToDefrag.getName()); + performFullDefragmentation(snapshotToDefrag, omSnapshot); + } else { + final String psIdtableKey = snapshotChainManager.getTableKey(pathPreviousSnapshotId); + SnapshotInfo previousDefraggedSnapshot = snapshotInfoTable.get(psIdtableKey); + + LOG.info("Performing incremental defragmentation for snapshot: {} " + + "based on previous defragmented snapshot: {}", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + + // If previous path snapshot is not null, it must have been defragmented already + // Sanity check to ensure previous snapshot exists and is defragmented + if (needsDefragmentation(previousDefraggedSnapshot)) { + LOG.error("Fatal error before defragging snapshot: {}. " + + "Previous snapshot in path {} was not defragged while it is expected to be.", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + break; + } + + performIncrementalDefragmentation(snapshotToDefrag, + previousDefraggedSnapshot, omSnapshot); + } + + // TODO: Update snapshot metadata here? + + // Close and evict the original snapshot DB from SnapshotCache + // TODO: Implement proper eviction from SnapshotCache + LOG.info("Defragmentation completed for snapshot: {}", + snapshotToDefrag.getName()); + + snapshotLimit--; + snapshotsDefraggedCount.getAndIncrement(); + + } catch (OMException ome) { + if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) { + LOG.info("Snapshot {} was deleted during defragmentation", + snapshotToDefrag.getName()); + } else { + LOG.error("OMException during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), ome); + } + } + + } catch (Exception e) { + LOG.error("Exception during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), e); + return false; + } finally { + // Release lock MultiSnapshotLocks + snapshotIdLocks.releaseLock(); + LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + } + } + + return true; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + // TODO: Can be parallelized for different buckets + queue.add(new SnapshotDefragTask()); + return queue; + } + + /** + * Returns true if the service run conditions are satisfied, false otherwise. + */ + private boolean shouldRun() { + if (ozoneManager == null) { + // OzoneManager can be null for testing + return true; + } + if (ozoneManager.getOmRatisServer() == null) { + LOG.warn("OzoneManagerRatisServer is not initialized yet"); + return false; + } + // The service only runs if current OM node is ready + return running.get() && ozoneManager.isRunning(); + } + + public AtomicLong getSnapshotsDefraggedCount() { + return snapshotsDefraggedCount; + } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } + + @Override + public void shutdown() { + running.set(false); + super.shutdown(); + } +} +
+ * The service processes snapshots in the active chain sequentially, starting with + * the first non-defragmented snapshot. For the first snapshot in the chain, it + * performs a full defragmentation by copying all keys. For subsequent snapshots, + * it uses incremental defragmentation based on diffs from the previous defragmented + * snapshot. + */ +public class SnapshotDefragService extends BackgroundService + implements BootstrapStateHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SnapshotDefragService.class); + + // Use only a single thread for snapshot defragmentation to avoid conflicts + private static final int DEFRAG_CORE_POOL_SIZE = 1; + + private final OzoneManager ozoneManager; + private final AtomicLong runCount = new AtomicLong(0); + + // Number of snapshots to be processed in a single iteration + private final long snapshotLimitPerTask; + + private final AtomicLong snapshotsDefraggedCount; + private final AtomicBoolean running; + + private final MultiSnapshotLocks snapshotIdLocks; + + private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); + + public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, + OzoneManager ozoneManager, OzoneConfiguration configuration) { + super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, + serviceTimeout, ozoneManager.getThreadNamePrefix()); + this.ozoneManager = ozoneManager; + this.snapshotLimitPerTask = configuration + .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, + SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT); + snapshotsDefraggedCount = new AtomicLong(0); + running = new AtomicBoolean(false); + IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock(); + this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, true); + } + + @Override + public void start() { + running.set(true); + super.start(); + } + + @VisibleForTesting + public void pause() { + running.set(false); + } + + @VisibleForTesting + public void resume() { + running.set(true); + } + + /** + * Checks if rocks-tools native library is available. + */ + private boolean isRocksToolsNativeLibAvailable() { + try { + return ManagedRawSSTFileReader.tryLoadLibrary(); + } catch (Exception e) { + LOG.warn("Failed to check native code availability", e); + return false; + } + } + + /** + * Checks if a snapshot needs defragmentation by examining its YAML metadata. + */ + private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + try { + // Read snapshot local metadata from YAML + OmSnapshotLocalData snapshotLocalData = ozoneManager.getOmSnapshotManager() + .getSnapshotLocalDataManager() + .getOmSnapshotLocalData(snapshotInfo); + + // Check if snapshot needs compaction (defragmentation) + boolean needsDefrag = snapshotLocalData.getNeedsDefrag(); + LOG.debug("Snapshot {} needsDefragmentation field value: {}", + snapshotInfo.getName(), needsDefrag); + + return needsDefrag; + } catch (IOException e) { + LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed", + snapshotInfo.getName(), e); + return true; + } + } + + /** + * Performs full defragmentation for the first snapshot in the chain. + * This is a simplified implementation that demonstrates the concept. + */ + private void performFullDefragmentation(SnapshotInfo snapshotInfo, + OmSnapshot omSnapshot) throws IOException { + + // TODO: Implement full defragmentation + } + + /** + * Performs incremental defragmentation using diff from previous defragmented snapshot. + */ + private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot, + SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot) + throws IOException { + + // TODO: Implement incremental defragmentation + } + + private final class SnapshotDefragTask implements BackgroundTask { + + @Override + public BackgroundTaskResult call() throws Exception { + // Check OM leader and readiness + if (shouldRun()) { + final long count = runCount.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating Snapshot Defragmentation Task: run # {}", count); + } + triggerSnapshotDefragOnce(); + } + + return EmptyTaskResult.newResult(); + } + } + + public synchronized boolean triggerSnapshotDefragOnce() throws IOException { + // Check if rocks-tools native lib is available + if (!isRocksToolsNativeLibAvailable()) { + LOG.warn("Rocks-tools native library is not available. " + + "Stopping SnapshotDefragService."); + return false; + } + + Optional snapshotManager = Optional.ofNullable(ozoneManager) + .map(OzoneManager::getOmSnapshotManager); + if (!snapshotManager.isPresent()) { + LOG.debug("OmSnapshotManager not available, skipping defragmentation task"); + return false; + } + + // Get the SnapshotChainManager to iterate through the global snapshot chain + final SnapshotChainManager snapshotChainManager = + ((OmMetadataManagerImpl) ozoneManager.getMetadataManager()).getSnapshotChainManager(); + + final Table snapshotInfoTable = + ozoneManager.getMetadataManager().getSnapshotInfoTable(); + + // Use iterator(false) to iterate forward through the snapshot chain + Iterator snapshotIterator = snapshotChainManager.iterator(false); + + long snapshotLimit = snapshotLimitPerTask; + + while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) { + // Get SnapshotInfo for the current snapshot in the chain + UUID snapshotId = snapshotIterator.next(); + String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId); + SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey); + if (snapshotToDefrag == null) { + LOG.warn("Snapshot with ID '{}' not found in snapshot info table", snapshotId); + continue; + } + + // Skip deleted snapshots + if (snapshotToDefrag.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) { + LOG.debug("Skipping deleted snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + continue; + } + + // Check if this snapshot needs defragmentation + if (!needsDefragmentation(snapshotToDefrag)) { + LOG.debug("Skipping already defragged snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + continue; + } + + LOG.info("Will defrag snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + // Acquire MultiSnapshotLocks + if (!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId())) + .isLockAcquired()) { + LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + break; + } + + try { + LOG.info("Processing snapshot defragmentation for: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + // Get snapshot through SnapshotCache for proper locking + try (UncheckedAutoCloseableSupplier snapshotSupplier = + snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) { + + OmSnapshot omSnapshot = snapshotSupplier.get(); + + UUID pathPreviousSnapshotId = snapshotToDefrag.getPathPreviousSnapshotId(); + boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null; + if (isFirstSnapshotInPath) { + LOG.info("Performing full defragmentation for first snapshot (in path): {}", + snapshotToDefrag.getName()); + performFullDefragmentation(snapshotToDefrag, omSnapshot); + } else { + final String psIdtableKey = snapshotChainManager.getTableKey(pathPreviousSnapshotId); + SnapshotInfo previousDefraggedSnapshot = snapshotInfoTable.get(psIdtableKey); + + LOG.info("Performing incremental defragmentation for snapshot: {} " + + "based on previous defragmented snapshot: {}", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + + // If previous path snapshot is not null, it must have been defragmented already + // Sanity check to ensure previous snapshot exists and is defragmented + if (needsDefragmentation(previousDefraggedSnapshot)) { + LOG.error("Fatal error before defragging snapshot: {}. " + + "Previous snapshot in path {} was not defragged while it is expected to be.", + snapshotToDefrag.getName(), previousDefraggedSnapshot.getName()); + break; + } + + performIncrementalDefragmentation(snapshotToDefrag, + previousDefraggedSnapshot, omSnapshot); + } + + // TODO: Update snapshot metadata here? + + // Close and evict the original snapshot DB from SnapshotCache + // TODO: Implement proper eviction from SnapshotCache + LOG.info("Defragmentation completed for snapshot: {}", + snapshotToDefrag.getName()); + + snapshotLimit--; + snapshotsDefraggedCount.getAndIncrement(); + + } catch (OMException ome) { + if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) { + LOG.info("Snapshot {} was deleted during defragmentation", + snapshotToDefrag.getName()); + } else { + LOG.error("OMException during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), ome); + } + } + + } catch (Exception e) { + LOG.error("Exception during snapshot defragmentation for: {}", + snapshotToDefrag.getName(), e); + return false; + } finally { + // Release lock MultiSnapshotLocks + snapshotIdLocks.releaseLock(); + LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})", + snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId()); + + } + } + + return true; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + // TODO: Can be parallelized for different buckets + queue.add(new SnapshotDefragTask()); + return queue; + } + + /** + * Returns true if the service run conditions are satisfied, false otherwise. + */ + private boolean shouldRun() { + if (ozoneManager == null) { + // OzoneManager can be null for testing + return true; + } + if (ozoneManager.getOmRatisServer() == null) { + LOG.warn("OzoneManagerRatisServer is not initialized yet"); + return false; + } + // The service only runs if current OM node is ready + return running.get() && ozoneManager.isRunning(); + } + + public AtomicLong getSnapshotsDefraggedCount() { + return snapshotsDefraggedCount; + } + + @Override + public BootstrapStateHandler.Lock getBootstrapStateLock() { + return lock; + } + + @Override + public void shutdown() { + running.set(false); + super.shutdown(); + } +} +