From 8423ac7618dab643dd9987cfb239afcb4925021d Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Tue, 7 Oct 2025 13:32:53 -0700
Subject: [PATCH 01/17] HDDS-13009. Background snapshot defrag service
From 838cd436770cdf74357f660836c2eb705f61ce80 Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Mon, 18 Aug 2025 21:09:55 -0700
Subject: [PATCH 02/17] Make `RDBSstFileWriter` public to be used in defrag
service.
(cherry picked from commit f81e5b7df67d3cec416f95f6cacdcc98ba3e924b)
---
.../org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index e84854cae443..aad9232844fc 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -28,7 +28,7 @@
/**
* DumpFileWriter using rocksdb sst files.
*/
-class RDBSstFileWriter implements Closeable {
+public class RDBSstFileWriter implements Closeable {
private ManagedSstFileWriter sstFileWriter;
private File sstFile;
@@ -36,7 +36,7 @@ class RDBSstFileWriter implements Closeable {
private ManagedOptions emptyOption = new ManagedOptions();
private final ManagedEnvOptions emptyEnvOptions = new ManagedEnvOptions();
- RDBSstFileWriter(File externalFile) throws RocksDatabaseException {
+ public RDBSstFileWriter(File externalFile) throws RocksDatabaseException {
this.sstFileWriter = new ManagedSstFileWriter(emptyEnvOptions, emptyOption);
this.keyCounter = new AtomicLong(0);
this.sstFile = externalFile;
From 61b9ffc034aa65d83611a3244c4df046155277bf Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Mon, 18 Aug 2025 21:10:16 -0700
Subject: [PATCH 03/17] Implement `delete(key)` in `RDBSstFileWriter`.
(cherry picked from commit e15a62d330cb6b2ef479bed1f4e7524368feb9c3)
---
.../apache/hadoop/hdds/utils/db/RDBSstFileWriter.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
index aad9232844fc..5aa561ba9486 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java
@@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws RocksDatabaseException {
}
}
+ public void delete(byte[] key) throws RocksDatabaseException {
+ try {
+ sstFileWriter.delete(key);
+ keyCounter.incrementAndGet();
+ } catch (RocksDBException e) {
+ closeOnFailure();
+ throw new RocksDatabaseException("Failed to delete key (length=" + key.length
+ + "), sstFile=" + sstFile.getAbsolutePath(), e);
+ }
+ }
+
@Override
public void close() throws RocksDatabaseException {
if (sstFileWriter != null) {
From 73625a8b1da6bf788eb35be955ab098e79b0363f Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Mon, 18 Aug 2025 21:11:17 -0700
Subject: [PATCH 04/17] Add config `ozone.snapshot.defrag.service.timeout`,
`ozone.snapshot.defrag.limit.per.task`,
`ozone.snapshot.defrag.service.interval`.
(cherry picked from commit c04d67af09d3d964ad430032659ff44941ec2729)
---
.../java/org/apache/hadoop/ozone/OzoneConfigKeys.java | 6 ++++++
.../java/org/apache/hadoop/ozone/om/OMConfigKeys.java | 10 ++++++++++
2 files changed, 16 insertions(+)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1d47fb72958f..92b4ee7c2f89 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -277,6 +277,12 @@ public final class OzoneConfigKeys {
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_TIMEOUT_DEFAULT = "300s";
// 300s for default
+ public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT =
+ "ozone.snapshot.defrag.service.timeout";
+ public static final String
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s";
+ // 300s for default
+
public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL =
"ozone.snapshot.deleting.service.interval";
public static final String
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 87bc9cb30170..80f84fb3abce 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -428,11 +428,21 @@ public final class OMConfigKeys {
"ozone.snapshot.deleting.limit.per.task";
public static final int SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT = 10;
+ // Snapshot defragmentation service configuration
+ public static final String SNAPSHOT_DEFRAG_LIMIT_PER_TASK =
+ "ozone.snapshot.defrag.limit.per.task";
+ public static final int SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT = 1;
+
public static final String OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL =
"ozone.snapshot.filtering.service.interval";
public static final String
OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT = "60s";
+ public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL =
+ "ozone.snapshot.defrag.service.interval";
+ public static final String
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "60s";
+
public static final String
OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT =
"ozone.om.snapshot.checkpoint.dir.creation.poll.timeout";
From f442502dd0ac61eac563ffc95d798cfe62dd07ba Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Mon, 6 Oct 2025 08:41:01 -0700
Subject: [PATCH 05/17] Add comments/TODOs for defrag service interval/timeout
---
.../main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java | 5 ++++-
.../main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 ++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 92b4ee7c2f89..db66fed22fe9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -281,7 +281,10 @@ public final class OzoneConfigKeys {
"ozone.snapshot.defrag.service.timeout";
public static final String
OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT = "300s";
- // 300s for default
+ // TODO: Adjust timeout as needed.
+ // One concern would be that snapdiff can take a long time.
+ // If snapdiff wait time is included in the timeout it can make it indeterministic.
+ // -- So don't wait? Trigger and check later?
public static final String OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL =
"ozone.snapshot.deleting.service.interval";
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 80f84fb3abce..969288ed92c8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -441,7 +441,8 @@ public final class OMConfigKeys {
public static final String OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL =
"ozone.snapshot.defrag.service.interval";
public static final String
- OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "60s";
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT = "-1";
+ // TODO: Disabled by default. Do not enable by default until upgrade handling is complete.
public static final String
OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT =
From 8e37e5d6ce091af5a27e1947629213488c9e2697 Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Mon, 18 Aug 2025 21:15:04 -0700
Subject: [PATCH 06/17] Implement SnapshotDefragService
(cherry picked from commit ac5839424248373c7a4fded963b54c18b68b2b4e)
---
.../org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../apache/hadoop/ozone/om/KeyManager.java | 6 +
.../hadoop/ozone/om/KeyManagerImpl.java | 72 +-
.../ozone/om/SnapshotDefragService.java | 1135 +++++++++++++++++
4 files changed, 1212 insertions(+), 2 deletions(-)
create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 0b18d8aef674..5e4002e8ea06 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -517,6 +517,7 @@ public final class OzoneConsts {
public static final String OM_SNAPSHOT_DIR = "db.snapshots";
public static final String OM_SNAPSHOT_CHECKPOINT_DIR = OM_SNAPSHOT_DIR
+ OM_KEY_PREFIX + "checkpointState";
+ public static final String OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR = "checkpointStateDefragged";
public static final String OM_SNAPSHOT_DIFF_DIR = OM_SNAPSHOT_DIR
+ OM_KEY_PREFIX + "diffState";
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 7e76885c49bd..872a99e94b15 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -346,6 +346,12 @@ DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
*/
SstFilteringService getSnapshotSstFilteringService();
+ /**
+ * Returns the instance of Snapshot Defrag service.
+ * @return Background service.
+ */
+ SnapshotDefragService getSnapshotDefragService();
+
/**
* Returns the instance of Snapshot Deleting service.
* @return Background service.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d48e15cf66df..d4a79421c60e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -30,6 +30,8 @@
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
@@ -58,6 +60,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
@@ -200,6 +204,7 @@ public class KeyManagerImpl implements KeyManager {
private KeyDeletingService keyDeletingService;
private SstFilteringService snapshotSstFilteringService;
+ private SnapshotDefragService snapshotDefragService;
private SnapshotDeletingService snapshotDeletingService;
private final KeyProviderCryptoExtension kmsProvider;
@@ -308,6 +313,11 @@ public void start(OzoneConfiguration configuration) {
startSnapshotSstFilteringService(configuration);
}
+ if (snapshotDefragService == null &&
+ ozoneManager.isFilesystemSnapshotEnabled()) {
+ startSnapshotDefragService(configuration);
+ }
+
if (snapshotDeletingService == null &&
ozoneManager.isFilesystemSnapshotEnabled()) {
@@ -391,6 +401,42 @@ public void stopSnapshotSstFilteringService() {
}
}
+ /**
+ * Start the snapshot defrag service if interval is not set to disabled value.
+ * @param conf
+ */
+ public void startSnapshotDefragService(OzoneConfiguration conf) {
+ if (isDefragSvcEnabled()) {
+ long serviceInterval = conf.getTimeDuration(
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL,
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = conf.getTimeDuration(
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT,
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ snapshotDefragService =
+ new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
+ serviceTimeout, ozoneManager, conf);
+ snapshotDefragService.start();
+ } else {
+ LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation will not run periodically.");
+ }
+ }
+
+ /**
+ * Stop the snapshot defrag service if it is running.
+ */
+ public void stopSnapshotDefragService() {
+ if (snapshotDefragService != null) {
+ snapshotDefragService.shutdown();
+ snapshotDefragService = null;
+ } else {
+ LOG.info("SnapshotDefragService is already stopped or not started.");
+ }
+ }
+
private void startCompactionService(OzoneConfiguration configuration,
boolean isCompactionServiceEnabled) {
if (compactionService == null && isCompactionServiceEnabled) {
@@ -417,7 +463,7 @@ KeyProviderCryptoExtension getKMSProvider() {
}
@Override
- public void stop() throws IOException {
+ public void stop() {
if (keyDeletingService != null) {
keyDeletingService.shutdown();
keyDeletingService = null;
@@ -434,6 +480,10 @@ public void stop() throws IOException {
snapshotSstFilteringService.shutdown();
snapshotSstFilteringService = null;
}
+ if (snapshotDefragService != null) {
+ snapshotDefragService.shutdown();
+ snapshotDefragService = null;
+ }
if (snapshotDeletingService != null) {
snapshotDeletingService.shutdown();
snapshotDeletingService = null;
@@ -448,6 +498,15 @@ public void stop() throws IOException {
}
}
+ /**
+ * Get the SnapshotDefragService instance.
+ *
+ * @return SnapshotDefragService instance, or null if not initialized
+ */
+ public SnapshotDefragService getSnapshotDefragService() {
+ return snapshotDefragService;
+ }
+
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
throws IOException {
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -968,7 +1027,16 @@ public boolean isSstFilteringSvcEnabled() {
// any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay
return serviceInterval > 0;
}
-
+
+ public boolean isDefragSvcEnabled() {
+ long serviceInterval = ozoneManager.getConfiguration()
+ .getTimeDuration(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL,
+ OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ // any interval <= 0 causes IllegalArgumentException from scheduleWithFixedDelay
+ return serviceInterval > 0;
+ }
+
@Override
public OmMultipartUploadList listMultipartUploads(String volumeName,
String bucketName,
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
new file mode 100644
index 000000000000..3e0195715518
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -0,0 +1,1135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may contain
+ * fragmented data. This service defragments snapshots by creating new compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ *
+ * The service processes snapshots in the active chain sequentially, starting with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent snapshots,
+ * it uses incremental defragmentation based on diffs from the previous defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private static final String CHECKPOINT_STATE_DEFRAGGED_DIR = OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR;
+ private static final String TEMP_DIFF_DIR = "tempDiffSstFiles"; // TODO: Put this in OzoneConsts?
+
+ private final OzoneManager ozoneManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
+ Table snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for first snapshot needing defragmentation in active chain");
+
+ try (TableIterator> iterator =
+ snapshotInfoTable.iterator()) {
+ iterator.seekToFirst();
+
+ while (iterator.hasNext()) {
+ Table.KeyValue keyValue = iterator.next();
+ SnapshotInfo snapshotInfo = keyValue.getValue();
+
+ // Skip deleted snapshots
+ if (snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found snapshot needing defragmentation: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ LOG.debug("Snapshot {} already defragmented, continuing search",
+ snapshotInfo.getName());
+ }
+ }
+
+ LOG.debug("No snapshots found needing defragmentation");
+ return null;
+ }
+
+ /**
+ * Finds the previous defragmented snapshot in the chain.
+ */
+ private SnapshotInfo findPreviousDefraggedSnapshot(SnapshotInfo currentSnapshot,
+ Table snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for previous defragmented snapshot before: {}",
+ currentSnapshot.getName());
+
+ // Walk backwards through the snapshot chain using pathPreviousSnapshotId
+ String previousSnapshotId = currentSnapshot.getPathPreviousSnapshotId() != null ?
+ currentSnapshot.getPathPreviousSnapshotId().toString() : null;
+
+ while (previousSnapshotId != null) {
+ try (TableIterator> iterator =
+ snapshotInfoTable.iterator()) {
+ iterator.seekToFirst();
+
+ while (iterator.hasNext()) {
+ Table.KeyValue keyValue = iterator.next();
+ SnapshotInfo snapshotInfo = keyValue.getValue();
+
+ if (snapshotInfo.getSnapshotId().toString().equals(previousSnapshotId)) {
+ if (!needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found previous defragmented snapshot: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ // Continue searching with this snapshot's previous
+ previousSnapshotId = snapshotInfo.getPathPreviousSnapshotId() != null ?
+ snapshotInfo.getPathPreviousSnapshotId().toString() : null;
+ break;
+ }
+ }
+ }
+ }
+
+ LOG.debug("No previous defragmented snapshot found");
+ return null;
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ // For defraggedDbPath, we need to go up to the parent directory and use checkpointStateDefragged
+ String parentDir = Paths.get(snapshotPath).getParent().getParent().toString();
+ String checkpointDirName = Paths.get(snapshotPath).getFileName().toString();
+ String defraggedDbPath = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, checkpointDirName).toString();
+
+ LOG.info("Starting full defragmentation for snapshot: {} at path: {}",
+ snapshotInfo.getName(), snapshotPath);
+ LOG.info("Target defragmented DB path: {}", defraggedDbPath);
+
+ // Create defragmented directory
+ Files.createDirectories(Paths.get(defraggedDbPath));
+
+ // TODO: Get snapshot checkpoint DB via SnapshotCache
+ RDBStore originalStore = (RDBStore) omSnapshot.getMetadataManager().getStore();
+ RocksDatabase originalDb = originalStore.getDb();
+
+ LOG.info("Starting defragmentation process for snapshot: {}", snapshotInfo.getName());
+ LOG.info("Original DB path: {}", snapshotPath);
+ LOG.info("Defragmented DB path: {}", defraggedDbPath);
+
+ // Implement actual RocksDB defragmentation
+ try {
+ // 1. Create a new RocksDB instance at defraggedDbPath
+ DBStoreBuilder defraggedDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration())
+ .setName(checkpointDirName)
+ .setPath(Paths.get(defraggedDbPath).getParent())
+ .setCreateCheckpointDirs(false);
+
+ // Add all the tracked column families to the new DB
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ defraggedDbBuilder.addTable(cfName);
+ LOG.debug("Added column family {} to defragmented DB", cfName);
+ }
+
+ // Build the new defragmented database
+ DBStore defraggedStore = defraggedDbBuilder.build();
+ RocksDatabase defraggedDb = ((RDBStore) defraggedStore).getDb();
+
+ LOG.info("Created new defragmented DB instance for snapshot: {}", snapshotInfo.getName());
+
+ // 2. & 3. Iterate through tracked column families and copy all key-value pairs
+ long totalKeysCopied = 0;
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ LOG.info("Starting defragmentation of column family: {} for snapshot: {}",
+ cfName, snapshotInfo.getName());
+
+ RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName);
+
+ if (originalCf == null) {
+ LOG.warn("Column family {} not found in original DB, skipping", cfName);
+ continue;
+ }
+
+ if (defraggedCf == null) {
+ LOG.warn("Column family {} not found in defragmented DB, skipping", cfName);
+ continue;
+ }
+
+ long cfKeysCopied = 0;
+ try (ManagedWriteBatch writeBatch = new ManagedWriteBatch();
+ ManagedRocksIterator iterator = originalDb.newIterator(originalCf)) {
+
+ iterator.get().seekToFirst();
+
+ while (iterator.get().isValid()) {
+ byte[] key = iterator.get().key();
+ byte[] value = iterator.get().value();
+
+ // Add to batch for efficient writing
+ defraggedCf.batchPut(writeBatch, key, value);
+ cfKeysCopied++;
+
+ // Commit batch every 1000 keys to avoid memory issues
+ if (cfKeysCopied % 1000 == 0) {
+ defraggedDb.batchWrite(writeBatch);
+ writeBatch.clear();
+ LOG.debug("Copied {} keys for column family {} so far", cfKeysCopied, cfName);
+ }
+
+ iterator.get().next();
+ }
+
+ // Commit any remaining keys in the batch
+ if (writeBatch.count() > 0) {
+ defraggedDb.batchWrite(writeBatch);
+ }
+
+ totalKeysCopied += cfKeysCopied;
+ LOG.info("Completed copying {} keys for column family: {} in snapshot: {}",
+ cfKeysCopied, cfName, snapshotInfo.getName());
+ }
+ }
+
+ LOG.info("Copied total of {} keys across all column families for snapshot: {}",
+ totalKeysCopied, snapshotInfo.getName());
+
+ // 4. Perform compaction on the new DB to ensure it's fully defragmented
+ LOG.info("Starting compaction of defragmented DB for snapshot: {}", snapshotInfo.getName());
+ try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) {
+ compactOptions.setChangeLevel(true);
+ compactOptions.setTargetLevel(1);
+ defraggedDb.compactDB(compactOptions);
+ }
+ LOG.info("Completed compaction of defragmented DB for snapshot: {}", snapshotInfo.getName());
+
+ // 5. Verify data integrity between original and defragmented DBs
+ verifyDbIntegrity(originalDb, defraggedDb, snapshotInfo);
+
+ // Close the defragmented DB
+ defraggedStore.close();
+
+ // TODO: Rename om.db to the om.db-UUID (at least for full defrag)
+
+ LOG.info("Successfully completed full defragmentation for snapshot: {} with {} keys copied",
+ snapshotInfo.getName(), totalKeysCopied);
+
+ } catch (RocksDatabaseException e) {
+ LOG.error("RocksDB error during defragmentation of snapshot: {}", snapshotInfo.getName(), e);
+ throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e);
+ } catch (Exception e) {
+ LOG.error("Unexpected error during defragmentation of snapshot: {}", snapshotInfo.getName(), e);
+ throw new IOException("Failed to defragment snapshot: " + snapshotInfo.getName(), e);
+ }
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir = Paths.get(previousSnapshotPath).getParent().getParent().toString();
+ String previousCheckpointDirName = Paths.get(previousSnapshotPath).getFileName().toString();
+ String previousDefraggedDbPath = Paths.get(previousParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir = Paths.get(currentSnapshotPath).getParent().getParent().toString();
+ String currentCheckpointDirName = Paths.get(currentSnapshotPath).getFileName().toString();
+ String currentDefraggedDbPath = Paths.get(currentParentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using previous: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ LOG.warn("Previous defragmented DB not found at {}, falling back to full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // 1. Create a checkpoint from the previous defragmented DB directly at target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to target location");
+
+ // Open the previous defragmented DB to create checkpoint.
+ // TODO: via SnapshotCache or something equivalent for lock protection
+ DBStoreBuilder previousDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration())
+ .setName(previousCheckpointDirName)
+ .setPath(Paths.get(previousDefraggedDbPath).getParent())
+ .setOpenReadOnly(true)
+ .setCreateCheckpointDirs(false);
+
+ // Add tracked column families
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ previousDbBuilder.addTable(cfName);
+ }
+
+ try (DBStore previousDefraggedStore = previousDbBuilder.build()) {
+ RocksDatabase previousDefraggedDb = ((RDBStore) previousDefraggedStore).getDb();
+
+ // Create checkpoint directly at the target location
+ try (RocksDatabase.RocksCheckpoint checkpoint = previousDefraggedDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint directly at target: {}", currentDefraggedDbPath);
+ }
+ }
+
+ // 2. Open the checkpoint as our working defragmented DB and apply incremental changes
+ DBStoreBuilder currentDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration())
+ .setName(currentCheckpointDirName)
+ .setPath(Paths.get(currentDefraggedDbPath).getParent())
+ .setCreateCheckpointDirs(false);
+
+ // Add tracked column families
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ currentDbBuilder.addTable(cfName);
+ }
+
+ // Build DB from the checkpoint
+ DBStore currentDefraggedStore = currentDbBuilder.build();
+ RocksDatabase currentDefraggedDb = ((RDBStore) currentDefraggedStore).getDb();
+
+ LOG.info("Opened checkpoint as working defragmented DB for incremental update");
+
+ // 3. Apply incremental changes from current snapshot
+ RDBStore currentSnapshotStore = (RDBStore) currentOmSnapshot.getMetadataManager().getStore();
+ RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb();
+
+ long incrementalKeysCopied = applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore,
+ currentSnapshot, previousDefraggedSnapshot);
+
+ LOG.info("Applied {} incremental changes for snapshot: {}",
+ incrementalKeysCopied, currentSnapshot.getName());
+
+ // 4. Perform compaction on the updated DB
+// LOG.info("Starting compaction of incrementally defragmented DB for snapshot: {}",
+// currentSnapshot.getName());
+// try (ManagedCompactRangeOptions compactOptions = new ManagedCompactRangeOptions()) {
+// compactOptions.setChangeLevel(true);
+// compactOptions.setTargetLevel(1);
+// currentDefraggedDb.compactDB(compactOptions);
+// }
+// LOG.info("Completed compaction of incrementally defragmented DB");
+
+ // 5. Verify data integrity
+ verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb, currentSnapshot);
+
+ // Close the defragmented DB. TODO: Close in finally block instead
+ currentDefraggedStore.close();
+
+ LOG.info("Successfully completed incremental defragmentation for snapshot: {} with {} incremental changes",
+ currentSnapshot.getName(), incrementalKeysCopied);
+
+ } catch (RocksDatabaseException e) {
+ LOG.error("RocksDB error during incremental defragmentation of snapshot: {}",
+ currentSnapshot.getName(), e);
+// LOG.warn("Falling back to full defragmentation due to error");
+// performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ } catch (Exception e) {
+ LOG.error("Unexpected error during incremental defragmentation of snapshot: {}",
+ currentSnapshot.getName(), e);
+ LOG.warn("Falling back to full defragmentation due to error");
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ }
+ }
+
+ /**
+ * Applies incremental changes by using snapshotDiff to compute the diff list,
+ * then iterating that diff list against the current snapshot checkpoint DB.
+ * Uses RDBSstFileWriter to directly write changes to SST files and then ingests them.
+ */
+ @SuppressWarnings("checkstyle:MethodLength")
+ private long applyIncrementalChanges(RocksDatabase currentSnapshotDb, DBStore targetStore,
+ SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws RocksDatabaseException {
+
+ LOG.info("Applying incremental changes for snapshot: {} since: {} using snapshotDiff approach",
+ currentSnapshot.getName(), previousSnapshot.getName());
+
+ long totalChanges = 0;
+
+ // Create temporary directory for SST files
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String parentDir = Paths.get(currentSnapshotPath).getParent().getParent().toString();
+ String tempSstDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, TEMP_DIFF_DIR).toString();
+
+ try {
+ Files.createDirectories(Paths.get(tempSstDir));
+ LOG.info("Created temporary SST directory: {}", tempSstDir);
+
+ // Use snapshotDiff to compute the diff list between previous and current snapshot
+ LOG.info("Computing snapshot diff between {} and {}",
+ previousSnapshot.getName(), currentSnapshot.getName());
+
+ SnapshotDiffResponse diffResponse;
+ try {
+ // Call snapshotDiff to get the diff list
+ diffResponse = ozoneManager.snapshotDiff(
+ currentSnapshot.getVolumeName(),
+ currentSnapshot.getBucketName(),
+ previousSnapshot.getName(),
+ currentSnapshot.getName(),
+ null, // token - start from beginning
+ 0, // pageSize - get all diffs at once
+ false, // forceFullDiff
+ false // disableNativeDiff
+ );
+
+ // Wait for snapshotDiff computation to complete if it's still in progress
+ while (diffResponse.getJobStatus() == SnapshotDiffResponse.JobStatus.IN_PROGRESS ||
+ diffResponse.getJobStatus() == SnapshotDiffResponse.JobStatus.QUEUED) {
+ LOG.info("Snapshot diff computation in progress, waiting {} ms...",
+ diffResponse.getWaitTimeInMs());
+ // TODO: This can be improved by triggering snapdiff first, before any locks are grabbed,
+ // so that we don't have to wait here
+ Thread.sleep(diffResponse.getWaitTimeInMs());
+
+ // Poll for updated status
+ diffResponse = ozoneManager.snapshotDiff(
+ currentSnapshot.getVolumeName(),
+ currentSnapshot.getBucketName(),
+ previousSnapshot.getName(),
+ currentSnapshot.getName(),
+ null, // token
+ 0, // pageSize
+ false, // forceFullDiff
+ false // disableNativeDiff
+ );
+ }
+
+ if (diffResponse.getJobStatus() != SnapshotDiffResponse.JobStatus.DONE) {
+ throw new RocksDatabaseException("Snapshot diff computation failed with status: " +
+ diffResponse.getJobStatus() + ", reason: " + diffResponse.getReason());
+ }
+
+ LOG.info("Snapshot diff computation completed successfully");
+
+ } catch (Exception e) {
+ throw new RocksDatabaseException("Failed to compute snapshot diff", e);
+ }
+
+ SnapshotDiffReportOzone diffReport = diffResponse.getSnapshotDiffReport();
+ if (diffReport == null || diffReport.getDiffList() == null) {
+ LOG.info("No differences found between snapshots, no changes to apply");
+ return 0;
+ }
+
+ // TODO: Handle pagination when diffList is bigger than server page size
+ // 2025-08-16 09:10:52,500 [IPC Server handler 1 on default port 9862] INFO om.SnapshotDefragService: Found 1000 differences to process
+ LOG.info("Found {} differences to process", diffReport.getDiffList().size());
+
+ // Get table references for target database
+ RDBStore targetRdbStore = (RDBStore) targetStore;
+ RocksDatabase targetDb = targetRdbStore.getDb();
+
+ int nextToken = 0;
+ while (diffReport.getDiffList() != null && !diffReport.getDiffList().isEmpty()) {
+ final List diffList = diffReport.getDiffList();
+
+ // Group diff entries by column family and process each CF separately
+ // TODO: Use bucket layout to determine which column families to process
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ RocksDatabase.ColumnFamily currentCf = currentSnapshotDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily targetCf = targetDb.getColumnFamily(cfName);
+
+ if (currentCf == null || targetCf == null) {
+ LOG.warn("Column family {} not found, skipping incremental changes", cfName);
+ continue;
+ }
+
+ Table targetTable = targetRdbStore.getTable(cfName);
+ if (targetTable == null) {
+ LOG.warn("Table {} not found in target store, skipping", cfName);
+ continue;
+ }
+
+ long cfChanges = 0;
+ String sstFileName = cfName + "_" + currentSnapshot.getSnapshotId() + ".sst";
+ File sstFile = new File(tempSstDir, sstFileName);
+
+ LOG.debug("Creating SST file for column family {} changes: {}", cfName, sstFile.getAbsolutePath());
+
+ // Use RDBSstFileWriter to write changes to SST file
+ try (RDBSstFileWriter sstWriter = new RDBSstFileWriter(sstFile)) {
+
+ // Iterate through the diff list and process each entry
+ for (DiffReportEntry diffEntry : diffList) {
+ String sourcePath = new String(diffEntry.getSourcePath(), StandardCharsets.UTF_8);
+
+ // Extract the key from the path using volume and bucket from snapshot context
+ byte[] key = extractKeyFromPath(sourcePath, cfName,
+ currentSnapshot.getVolumeName(), currentSnapshot.getBucketName());
+ if (key == null) {
+ continue; // Skip if this entry doesn't belong to current column family
+ }
+
+ DiffType diffType = diffEntry.getType();
+
+ switch (diffType) {
+ case CREATE:
+ case MODIFY:
+ // Key was created or modified - get current value and write to SST
+ byte[] currentValue = currentSnapshotDb.get(currentCf, key);
+ if (currentValue != null) {
+ sstWriter.put(key, currentValue);
+ cfChanges++;
+
+ if (cfChanges % 10000 == 0) {
+ LOG.debug("Written {} changes to SST file for column family {} so far",
+ cfChanges, cfName);
+ }
+ }
+ break;
+
+ case DELETE:
+ // Key was deleted - write tombstone to SST
+ /* TODO: Sort keys before writing to SST file?
+Caused by: org.rocksdb.RocksDBException: Keys must be added in strict ascending order.
+at org.rocksdb.SstFileWriter.delete(Native Method)
+at org.rocksdb.SstFileWriter.delete(SstFileWriter.java:178)
+at org.apache.hadoop.hdds.utils.db.RDBSstFileWriter.delete(RDBSstFileWriter.java:65)
+ * */
+ sstWriter.delete(key);
+ cfChanges++;
+
+ if (cfChanges % 10000 == 0) {
+ LOG.debug("Written {} changes (including deletions) to SST file for column family {} so far",
+ cfChanges, cfName);
+ }
+ break;
+
+ case RENAME:
+ // Handle rename - delete old key and create new key
+ if (diffEntry.getTargetPath() != null) {
+ String targetPath = new String(diffEntry.getTargetPath(), StandardCharsets.UTF_8);
+ byte[] newKey = extractKeyFromPath(targetPath, cfName,
+ currentSnapshot.getVolumeName(), currentSnapshot.getBucketName());
+
+ if (newKey != null) {
+ // Delete old key
+ sstWriter.delete(key);
+
+ // Add new key with current value
+ byte[] newValue = currentSnapshotDb.get(currentCf, newKey);
+ if (newValue != null) {
+ sstWriter.put(newKey, newValue);
+ }
+ cfChanges += 2; // Count both delete and put
+ }
+ }
+ break;
+
+ default:
+ LOG.warn("Unknown diff type: {}, skipping entry", diffType);
+ break;
+ }
+ }
+
+ LOG.debug("Finished writing {} changes for column family: {} to SST file",
+ cfChanges, cfName);
+
+ } catch (Exception e) {
+ LOG.error("Error processing column family {} for snapshot {}: {}",
+ cfName, currentSnapshot.getName(), e.getMessage(), e);
+ }
+
+ // Ingest SST file into target database if there were changes
+ if (cfChanges > 0 && sstFile.exists() && sstFile.length() > 0) {
+ try {
+ targetTable.loadFromFile(sstFile);
+ LOG.info("Successfully ingested SST file for column family {}: {} changes",
+ cfName, cfChanges);
+ } catch (Exception e) {
+ LOG.error("Failed to ingest SST file for column family {}: {}", cfName, e.getMessage(), e);
+ }
+ } else {
+ LOG.debug("No changes to ingest for column family {}", cfName);
+ }
+
+ // Clean up SST file after ingestion
+ try {
+ if (sstFile.exists()) {
+ Files.delete(sstFile.toPath());
+ LOG.debug("Cleaned up SST file: {}", sstFile.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to clean up SST file: {}", sstFile.getAbsolutePath(), e);
+ }
+
+ totalChanges += cfChanges;
+ LOG.debug("Applied {} incremental changes for column family: {}", cfChanges, cfName);
+ }
+
+
+// String lastToken = new String(diffList.get(diffList.size() - 1).getSourcePath(), StandardCharsets.UTF_8);
+ nextToken += diffList.size();
+ LOG.debug("Retrieving next page of snapshot diff with token: {}", nextToken);
+ diffResponse = ozoneManager.snapshotDiff(
+ currentSnapshot.getVolumeName(),
+ currentSnapshot.getBucketName(),
+ previousSnapshot.getName(),
+ currentSnapshot.getName(),
+ String.valueOf(nextToken), // token
+ 0, // pageSize
+ false, // forceFullDiff
+ false // disableNativeDiff
+ );
+
+ if (diffResponse.getJobStatus() != SnapshotDiffResponse.JobStatus.DONE) {
+ throw new RocksDatabaseException("Expecting DONE but got unexpected snapshot diff status: " +
+ diffResponse.getJobStatus() + ", reason: " + diffResponse.getReason());
+ }
+
+ LOG.info("Retrieved next page of snapshot diff, size: {}",
+ diffResponse.getSnapshotDiffReport().getDiffList().size());
+ diffReport = diffResponse.getSnapshotDiffReport();
+ }
+
+ // Clean up temporary directory
+ try {
+ Files.deleteIfExists(Paths.get(tempSstDir));
+ LOG.debug("Cleaned up temporary SST directory: {}", tempSstDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to clean up temporary SST directory: {}", tempSstDir, e);
+ }
+
+ } catch (IOException e) {
+ throw new RocksDatabaseException("Failed to create temporary SST directory: " + tempSstDir, e);
+ }
+
+ LOG.info("Applied {} total incremental changes using snapshotDiff approach", totalChanges);
+ return totalChanges;
+ }
+
+ /**
+ * Extracts the database key from a diff report path for a specific column family.
+ * This method converts paths from snapshot diff reports into database keys.
+ */
+ private byte[] extractKeyFromPath(String path, String columnFamily, String volume, String bucket) {
+ try {
+ if (KEY_TABLE.equals(columnFamily)) {
+ // For keyTable, use OmMetadataManagerImpl#getOzoneKey
+ // Path in diff report contains just the key part (after volume/bucket)
+ String dbKey = ozoneManager.getMetadataManager().getOzoneKey(volume, bucket, path);
+ return dbKey.getBytes(StandardCharsets.UTF_8);
+ } else if (FILE_TABLE.equals(columnFamily)) { // TODO: FSO code path not tested
+ // For fileTable, use OmMetadataManagerImpl#getOzoneKeyFSO
+ // Path in diff report contains just the key part (after volume/bucket)
+ String dbKey = ozoneManager.getMetadataManager().getOzoneKeyFSO(volume, bucket, path);
+ return dbKey.getBytes(StandardCharsets.UTF_8);
+ }
+
+ // If we can't extract a valid key for this column family, return null
+ // This will cause the entry to be skipped for this column family
+ return null;
+
+ } catch (Exception e) {
+ LOG.warn("Failed to extract key from path: {} for column family: {}, volume: {}, bucket: {}, error: {}",
+ path, columnFamily, volume, bucket, e.getMessage(), e);
+ return null;
+ }
+ }
+
+ /**
+ * Verifies DB integrity by comparing key counts and spot-checking keys/values
+ * between the original and defragmented databases.
+ */
+ private void verifyDbIntegrity(RocksDatabase originalDb, RocksDatabase defraggedDb,
+ SnapshotInfo snapshotInfo) {
+
+ LOG.info("Starting DB integrity verification for snapshot: {}", snapshotInfo.getName());
+
+ boolean verificationPassed = true;
+ long totalOriginalKeys = 0;
+ long totalDefraggedKeys = 0;
+
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ LOG.debug("Verifying column family: {} for snapshot: {}",
+ cfName, snapshotInfo.getName());
+
+ RocksDatabase.ColumnFamily originalCf = originalDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily defraggedCf = defraggedDb.getColumnFamily(cfName);
+
+ if (originalCf == null) {
+ LOG.warn("Column family {} not found in original DB, skipping verification", cfName);
+ continue;
+ }
+
+ if (defraggedCf == null) {
+ LOG.error("Column family {} not found in defragmented DB, verification failed", cfName);
+ verificationPassed = false;
+ continue;
+ }
+
+ try {
+ // Count keys in original DB
+ long originalKeyCount = 0;
+ try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) {
+ originalIterator.get().seekToFirst();
+ while (originalIterator.get().isValid()) {
+ originalKeyCount++;
+ originalIterator.get().next();
+ }
+ }
+
+ // Count keys in defragmented DB
+ long defraggedKeyCount = 0;
+ try (ManagedRocksIterator defraggedIterator = defraggedDb.newIterator(defraggedCf)) {
+ defraggedIterator.get().seekToFirst();
+ while (defraggedIterator.get().isValid()) {
+ defraggedKeyCount++;
+ defraggedIterator.get().next();
+ }
+ }
+
+ totalOriginalKeys += originalKeyCount;
+ totalDefraggedKeys += defraggedKeyCount;
+
+ // Verify key counts match
+ if (originalKeyCount != defraggedKeyCount) {
+ LOG.error("Key count mismatch for column family {}: original={}, defragmented={}",
+ cfName, originalKeyCount, defraggedKeyCount);
+ verificationPassed = false;
+ } else {
+ LOG.info("Key count verification passed for column family {}: {} keys",
+ cfName, originalKeyCount);
+ }
+
+ // Full verification - check every single key-value pair
+ long fullCheckCount = 0;
+ long fullCheckErrors = 0;
+
+ try (ManagedRocksIterator originalIterator = originalDb.newIterator(originalCf)) {
+ originalIterator.get().seekToFirst();
+
+ while (originalIterator.get().isValid()) {
+ byte[] originalKey = originalIterator.get().key();
+ byte[] originalValue = originalIterator.get().value();
+
+ // Get the same key from defragmented DB
+ byte[] defraggedValue = defraggedDb.get(defraggedCf, originalKey);
+
+ if (defraggedValue == null) {
+ LOG.error("Key missing in defragmented DB for column family {}: key #{}",
+ cfName, fullCheckCount);
+ verificationPassed = false;
+ fullCheckErrors++;
+ } else if (!java.util.Arrays.equals(originalValue, defraggedValue)) {
+ LOG.error("Value mismatch for column family {}: key #{}",
+ cfName, fullCheckCount);
+ verificationPassed = false;
+ fullCheckErrors++;
+ }
+
+ fullCheckCount++;
+
+ // Log progress every 10,000 keys to avoid log spam
+ if (fullCheckCount % 10000 == 0) {
+ LOG.debug("Full verification progress for column family {}: checked {} keys, {} errors so far",
+ cfName, fullCheckCount, fullCheckErrors);
+ }
+
+ if (fullCheckErrors > 10) {
+ LOG.warn("Too many errors found during full verification for column family {}, stopping further checks",
+ cfName);
+ break; // Stop if too many errors to avoid flooding logs
+ }
+
+ originalIterator.get().next();
+ }
+ }
+
+ if (fullCheckErrors == 0) {
+ LOG.info("Full verification PASSED for column family {}: all {} keys verified successfully",
+ cfName, fullCheckCount);
+ } else {
+ LOG.error("Full verification FAILED for column family {}: {} errors found out of {} keys checked",
+ cfName, fullCheckErrors, fullCheckCount);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error during verification of column family {} for snapshot {}: {}",
+ cfName, snapshotInfo.getName(), e.getMessage(), e);
+ verificationPassed = false;
+ }
+ }
+
+ // Log final verification results
+ if (verificationPassed) {
+ LOG.info("DB integrity verification PASSED for snapshot: {} " +
+ "(total original keys: {}, total defragmented keys: {})",
+ snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys);
+ } else {
+ LOG.error("DB integrity verification FAILED for snapshot: {} " +
+ "(total original keys: {}, total defragmented keys: {})",
+ snapshotInfo.getName(), totalOriginalKeys, totalDefraggedKeys);
+ // Consider throwing an exception here if verification failure should halt the process
+ // throw new IOException("DB integrity verification failed for snapshot: " + snapshotInfo.getName());
+ }
+ }
+
+ /**
+ * Updates snapshot metadata to point to the new defragmented DB location.
+ */
+ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws IOException {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ LOG.info("Updating snapshot metadata for: {} at path: {}",
+ snapshotInfo.getName(), snapshotPath);
+
+ try {
+ // Read current YAML data using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile);
+
+ // Mark as defragmented by setting needsCompaction to false
+ yamlData.setNeedsDefragmentation(false);
+
+ // Write updated YAML data
+ yamlData.writeToYaml(yamlFile);
+
+ LOG.info("Successfully updated metadata for snapshot: {}, " +
+ "marked as defragmented (needsCompaction=false)",
+ snapshotInfo.getName());
+
+ } catch (IOException e) {
+ LOG.error("Failed to update metadata for snapshot: {}", snapshotInfo.getName(), e);
+ throw e;
+ }
+ }
+
+ private final class SnapshotDefragTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Check OM leader and readiness
+ if (shouldRun()) {
+ final long count = runCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating Snapshot Defragmentation Task: run # {}", count);
+ }
+ triggerSnapshotDefragOnce();
+ }
+
+ return EmptyTaskResult.newResult();
+ }
+ }
+
+ public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+ // Check if rocks-tools native lib is available
+ if (!isRocksToolsNativeLibAvailable()) {
+ LOG.warn("Rocks-tools native library is not available. " +
+ "Stopping SnapshotDefragService.");
+ return false;
+ }
+
+ Optional snapshotManager = Optional.ofNullable(ozoneManager)
+ .map(OzoneManager::getOmSnapshotManager);
+ if (!snapshotManager.isPresent()) {
+ LOG.debug("OmSnapshotManager not available, skipping defragmentation task");
+ return false;
+ }
+
+ Table snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (snapshotLimit > 0 && running.get()) {
+ // Find the first snapshot needing defragmentation
+ SnapshotInfo snapshotToDefrag = findFirstSnapshotNeedingDefrag(snapshotInfoTable);
+
+ if (snapshotToDefrag == null) {
+ LOG.info("No snapshots found needing defragmentation");
+ break;
+ }
+
+ // Acquire SNAPSHOT_GC_LOCK
+ OMLockDetails gcLockDetails = ozoneManager.getMetadataManager().getLock()
+ .acquireWriteLock(SNAPSHOT_GC_LOCK, "defrag-" + snapshotToDefrag.getSnapshotId());
+
+ if (!gcLockDetails.isLockAcquired()) {
+ LOG.warn("Failed to acquire SNAPSHOT_GC_LOCK for snapshot: {}",
+ snapshotToDefrag.getName());
+ break;
+ }
+
+ try {
+ LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ // Get snapshot through SnapshotCache for proper locking
+ try (UncheckedAutoCloseableSupplier snapshotSupplier =
+ snapshotManager.get().getActiveSnapshot(
+ snapshotToDefrag.getVolumeName(),
+ snapshotToDefrag.getBucketName(),
+ snapshotToDefrag.getName())) {
+
+ OmSnapshot omSnapshot = snapshotSupplier.get();
+
+ // Check if this is the first snapshot in the chain
+ SnapshotInfo previousDefraggedSnapshot = findPreviousDefraggedSnapshot(
+ snapshotToDefrag, snapshotInfoTable);
+
+ if (previousDefraggedSnapshot == null) {
+ LOG.info("Performing full defragmentation for first snapshot: {}",
+ snapshotToDefrag.getName());
+ performFullDefragmentation(snapshotToDefrag, omSnapshot);
+ } else {
+ LOG.info("Performing incremental defragmentation for snapshot: {} " +
+ "based on previous defragmented snapshot: {}",
+ snapshotToDefrag.getName(), previousDefraggedSnapshot.getName());
+ performIncrementalDefragmentation(snapshotToDefrag,
+ previousDefraggedSnapshot, omSnapshot);
+ }
+
+ // Update snapshot metadata
+ updateSnapshotMetadata(snapshotToDefrag);
+
+ // Close and evict the original snapshot DB from SnapshotCache
+ // TODO: Implement proper eviction from SnapshotCache
+ LOG.info("Defragmentation completed for snapshot: {}",
+ snapshotToDefrag.getName());
+
+ snapshotLimit--;
+ snapshotsDefraggedCount.getAndIncrement();
+
+ } catch (OMException ome) {
+ if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+ LOG.info("Snapshot {} was deleted during defragmentation",
+ snapshotToDefrag.getName());
+ } else {
+ LOG.error("OMException during snapshot defragmentation for: {}",
+ snapshotToDefrag.getName(), ome);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Exception during snapshot defragmentation for: {}",
+ snapshotToDefrag.getName(), e);
+ return false;
+ } finally {
+ // Release SNAPSHOT_GC_LOCK
+ ozoneManager.getMetadataManager().getLock()
+ .releaseWriteLock(SNAPSHOT_GC_LOCK, "defrag-" + snapshotToDefrag.getSnapshotId());
+ LOG.debug("Released SNAPSHOT_GC_LOCK for snapshot: {}", snapshotToDefrag.getName());
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new SnapshotDefragTask());
+ return queue;
+ }
+
+ /**
+ * Returns true if the service run conditions are satisfied, false otherwise.
+ */
+ private boolean shouldRun() {
+ if (ozoneManager == null) {
+ // OzoneManager can be null for testing
+ return true;
+ }
+ if (ozoneManager.getOmRatisServer() == null) {
+ LOG.warn("OzoneManagerRatisServer is not initialized yet");
+ return false;
+ }
+ // The service only runs if current OM node is ready
+ return running.get() && ozoneManager.isRunning();
+ }
+
+ public AtomicLong getSnapshotsDefraggedCount() {
+ return snapshotsDefraggedCount;
+ }
+
+ @Override
+ public BootstrapStateHandler.Lock getBootstrapStateLock() {
+ return lock;
+ }
+
+ @Override
+ public void shutdown() {
+ running.set(false);
+ super.shutdown();
+ }
+}
+
From 08cde3dd3a87938f12ca8b79ac625b2478f87987 Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Tue, 7 Oct 2025 13:32:31 -0700
Subject: [PATCH 07/17] Make it compile
---
.../org/apache/hadoop/hdds/utils/db/RocksDatabase.java | 6 +++---
.../apache/hadoop/ozone/om/SnapshotDefragService.java | 10 ++++++----
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 9eeb69ece3d8..30622c8e36ff 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -237,7 +237,7 @@ public boolean isClosed() {
*
* @see ManagedCheckpoint
*/
- final class RocksCheckpoint implements Closeable {
+ public final class RocksCheckpoint implements Closeable {
private final ManagedCheckpoint checkpoint;
private RocksCheckpoint() {
@@ -609,7 +609,7 @@ public List getLiveFilesMetaData() throws RocksDatabaseExcepti
}
}
- RocksCheckpoint createCheckpoint() {
+ public RocksCheckpoint createCheckpoint() {
return new RocksCheckpoint();
}
@@ -660,7 +660,7 @@ public Collection getExtraColumnFamilies() {
return Collections.unmodifiableCollection(columnFamilies.values());
}
- byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException {
+ public byte[] get(ColumnFamily family, byte[] key) throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
return db.get().get(family.getHandle(), key);
} catch (RocksDBException e) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
index 3e0195715518..3030e7da79fd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -151,7 +151,8 @@ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
try {
// Read YAML metadata using the correct API
File yamlFile = new File(snapshotPath + ".yaml");
- OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile);
+ OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(
+ ozoneManager.getOmSnapshotManager(), yamlFile); // TODO: Verify new usage
// Check if snapshot needs compaction (defragmentation)
boolean needsDefrag = yamlData.getNeedsDefrag();
@@ -955,13 +956,14 @@ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws IOExceptio
try {
// Read current YAML data using the correct API
File yamlFile = new File(snapshotPath + ".yaml");
- OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(yamlFile);
+ OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile(
+ ozoneManager.getOmSnapshotManager(), yamlFile); // TODO: Verify new usage
// Mark as defragmented by setting needsCompaction to false
- yamlData.setNeedsDefragmentation(false);
+ yamlData.setNeedsDefrag(false);
// Write updated YAML data
- yamlData.writeToYaml(yamlFile);
+ yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile); // TODO: Verify new usage
LOG.info("Successfully updated metadata for snapshot: {}, " +
"marked as defragmented (needsCompaction=false)",
From 2cd2cbffa646b5de556cd5658e4a0d02b177d941 Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Tue, 7 Oct 2025 13:36:41 -0700
Subject: [PATCH 08/17] Checkstyle
---
.../java/org/apache/hadoop/ozone/om/SnapshotDefragService.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
index 3030e7da79fd..9b01ff93b8ad 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java
@@ -590,7 +590,8 @@ private long applyIncrementalChanges(RocksDatabase currentSnapshotDb, DBStore ta
}
// TODO: Handle pagination when diffList is bigger than server page size
- // 2025-08-16 09:10:52,500 [IPC Server handler 1 on default port 9862] INFO om.SnapshotDefragService: Found 1000 differences to process
+ // 2025-08-16 09:10:52,500 [IPC Server handler 1 on default port 9862] INFO
+ // om.SnapshotDefragService: Found 1000 differences to process
LOG.info("Found {} differences to process", diffReport.getDiffList().size());
// Get table references for target database
From 6a8adb5697932df54664ef04bce77aa973675346 Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Mon, 18 Aug 2025 21:16:52 -0700
Subject: [PATCH 09/17] Add a test case for debugging
(cherry picked from commit ab9d67ee0ab4f05730ff50dbcd7fc394639c9e40)
---
.../snapshot/TestSnapshotDefragService2.java | 413 ++++++++++++++++++
1 file changed, 413 insertions(+)
create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java
new file mode 100644
index 000000000000..cf573e912454
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDefragService2.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static org.apache.ozone.test.LambdaTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.SnapshotDefragService;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+/**
+ * Test SnapshotDefragService functionality using MiniOzoneCluster.
+ * TODO: Rename to TestSnapshotDefragService.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class TestSnapshotDefragService2 {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotDefragService2.class);
+
+ private MiniOzoneCluster cluster;
+ private OzoneClient client;
+ private ObjectStore store;
+ private OzoneManager ozoneManager;
+ private SnapshotDefragService defragService;
+
+ @BeforeAll
+ void setup() throws Exception {
+ // Enable debug logging for SnapshotDefragService
+ GenericTestUtils.setLogLevel(LoggerFactory.getLogger(SnapshotDefragService.class), Level.DEBUG);
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 30, TimeUnit.SECONDS);
+ conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
+ conf.setBoolean(OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
+ conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, BucketLayout.OBJECT_STORE.name());
+// conf.setInt(OMStorage.TESTING_INIT_LAYOUT_VERSION_KEY,
+// OMLayoutFeature.SNAPSHOT_DEFRAGMENTATION.layoutVersion());
+
+ conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, -1);
+// conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT, 300);
+ conf.setInt(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, 5);
+
+ conf.setQuietMode(false);
+
+ // Create MiniOzoneCluster
+ cluster = MiniOzoneCluster.newBuilder(conf).build();
+ cluster.waitForClusterToBeReady();
+
+ client = cluster.newClient();
+ store = client.getObjectStore();
+ ozoneManager = cluster.getOzoneManager();
+
+ // Create SnapshotDefragService for manual triggering
+ defragService = new SnapshotDefragService(
+ 10000, // interval
+ TimeUnit.MILLISECONDS,
+ 30000, // service timeout
+ ozoneManager,
+ conf
+ );
+ }
+
+ @AfterAll
+ public void cleanup() throws Exception {
+ if (defragService != null) {
+ defragService.shutdown();
+ }
+ if (client != null) {
+ client.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Create keys in a bucket.
+ */
+ private void createKeys(String volumeName, String bucketName, int keyCount) throws Exception {
+ OzoneVolume volume = store.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ for (int i = 0; i < keyCount; i++) {
+ String keyName = "key-" + i;
+ String data = RandomStringUtils.randomAlphabetic(100);
+
+ try (OzoneOutputStream outputStream = bucket.createKey(keyName, data.length(),
+ StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
+ java.util.Collections.emptyMap(), java.util.Collections.emptyMap())) {
+ outputStream.write(data.getBytes());
+ }
+ }
+ LOG.info("Created {} keys in bucket {}/{}", keyCount, volumeName, bucketName);
+ }
+
+ /**
+ * Create a snapshot and wait for it to be available.
+ */
+ private void createSnapshot(String volumeName, String bucketName, String snapshotName)
+ throws Exception {
+ // Get existing checkpoint directories before creating snapshot
+ Set existingCheckpoints = getExistingCheckpointDirectories();
+
+ store.createSnapshot(volumeName, bucketName, snapshotName);
+
+ // Wait for snapshot to be created
+ GenericTestUtils.waitFor(() -> {
+ try {
+ SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName));
+ return snapshotInfo != null;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 1000, 30000);
+
+ // Wait for checkpoint DB to be created
+ waitForCheckpointDB(snapshotName, existingCheckpoints);
+
+ LOG.info("Created snapshot: {}", snapshotName);
+ }
+
+ /**
+ * Get existing checkpoint directories before snapshot creation.
+ */
+ private Set getExistingCheckpointDirectories() {
+ String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs");
+ if (metadataDir == null) {
+ metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs");
+ }
+
+ String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState";
+ File checkpointParentDir = new File(checkpointStateDir);
+
+ Set existingDirs = new java.util.HashSet<>();
+ if (checkpointParentDir.exists()) {
+ File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory);
+ if (checkpointDirs != null) {
+ for (File dir : checkpointDirs) {
+ existingDirs.add(dir.getName());
+ }
+ }
+ }
+
+ LOG.debug("Existing checkpoint directories: {}", existingDirs);
+ return existingDirs;
+ }
+
+ /**
+ * Wait for checkpoint DB to be created under checkpointState directory.
+ */
+ private void waitForCheckpointDB(String snapshotName, Set existingCheckpoints) throws Exception {
+ String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs");
+ if (metadataDir == null) {
+ metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs");
+ }
+
+ String checkpointStateDir = metadataDir + "/db.snapshots/checkpointState";
+ File checkpointParentDir = new File(checkpointStateDir);
+
+ LOG.info("Waiting for new checkpoint DB to be created under: {}", checkpointStateDir);
+
+ GenericTestUtils.waitFor(() -> {
+ if (!checkpointParentDir.exists()) {
+ LOG.debug("CheckpointState directory does not exist yet: {}", checkpointStateDir);
+ return false;
+ }
+
+ // List all directories in checkpointState
+ File[] checkpointDirs = checkpointParentDir.listFiles(File::isDirectory);
+ if (checkpointDirs == null || checkpointDirs.length == 0) {
+ LOG.debug("No checkpoint directories found in: {}", checkpointStateDir);
+ return false;
+ }
+
+ // Look for new checkpoint directories that weren't there before
+ for (File checkpointDir : checkpointDirs) {
+ String dirName = checkpointDir.getName();
+
+ // Skip if this directory existed before snapshot creation
+ if (existingCheckpoints.contains(dirName)) {
+ continue;
+ }
+
+ // Check if the new directory contains database files
+ File[] dbFiles = checkpointDir.listFiles();
+ if (dbFiles != null && dbFiles.length > 0) {
+ for (File dbFile : dbFiles) {
+ if (dbFile.isFile() && (dbFile.getName().endsWith(".sst") ||
+ dbFile.getName().equals("CURRENT") ||
+ dbFile.getName().startsWith("MANIFEST"))) {
+ LOG.info("New checkpoint DB found for snapshot {} in directory: {}",
+ snapshotName, checkpointDir.getAbsolutePath());
+ return true;
+ }
+ }
+ }
+
+ LOG.debug("New checkpoint directory found but no DB files yet: {}", checkpointDir.getAbsolutePath());
+ }
+
+ LOG.debug("Waiting for new checkpoint DB files to appear in checkpointState directories");
+ return false;
+ }, 1000, 60000); // Wait up to 60 seconds for checkpoint DB creation
+
+ LOG.info("Checkpoint DB created successfully for snapshot: {}", snapshotName);
+ }
+
+ /**
+ * Lists the contents of the db.snapshots directory.
+ */
+ private void printSnapshotDirectoryListing(String description) {
+ LOG.info("=== {} ===", description);
+ String metadataDir = ozoneManager.getConfiguration().get("ozone.om.db.dirs");
+ if (metadataDir == null) {
+ metadataDir = ozoneManager.getConfiguration().get("ozone.metadata.dirs");
+ }
+
+ String snapshotDir = metadataDir + "/db.snapshots";
+ File snapshotsDir = new File(snapshotDir);
+
+ if (!snapshotsDir.exists()) {
+ LOG.info("Snapshots directory does not exist: {}", snapshotDir);
+ return;
+ }
+
+ try (Stream paths = Files.walk(Paths.get(snapshotDir))) {
+ paths.sorted()
+ .forEach(path -> {
+ File file = path.toFile();
+ String relativePath = Paths.get(snapshotDir).relativize(path).toString();
+ if (file.isDirectory()) {
+ LOG.info("Directory: {}/", relativePath.isEmpty() ? "." : relativePath);
+ } else {
+ LOG.info("File: {} (size: {} bytes)", relativePath, file.length());
+ }
+ });
+ } catch (IOException e) {
+ LOG.error("Error listing snapshot directory: {}", snapshotDir, e);
+ }
+ }
+
+ /**
+ * Mark a snapshot as needing defragmentation by updating its YAML metadata.
+ * TODO: This is a workaround as the current logic does not
+ * automatically mark snapshots as needing defragmentation.
+ * This is not needed.
+ */
+ private void markSnapshotAsNeedingDefragmentation(SnapshotInfo snapshotInfo) throws IOException {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+ File yamlFile = new File(snapshotPath + ".yaml");
+
+ if (yamlFile.exists()) {
+ OmSnapshotLocalDataYaml yamlData =
+ OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(), yamlFile);
+ yamlData.setNeedsDefrag(true);
+ yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile);
+ LOG.info("Marked snapshot {} as needing defragmentation", snapshotInfo.getName());
+ } else {
+ LOG.warn("YAML file not found for snapshot {}: {}", snapshotInfo.getName(), yamlFile.getPath());
+ }
+ }
+
+ /**
+ * Trigger the SnapshotDefragService by starting it and waiting for it to process snapshots.
+ */
+ private void triggerSnapshotDefragService() throws Exception {
+ LOG.info("Triggering SnapshotDefragService...");
+
+ // Mark all snapshots as needing defragmentation first
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ try (TableIterator> iterator =
+ metadataManager.getSnapshotInfoTable().iterator()) {
+ iterator.seekToFirst();
+ while (iterator.hasNext()) {
+ Table.KeyValue entry = iterator.next();
+ SnapshotInfo snapshotInfo = entry.getValue();
+ markSnapshotAsNeedingDefragmentation(snapshotInfo);
+ }
+ }
+
+ long initialDefragCount = defragService.getSnapshotsDefraggedCount().get();
+ LOG.info("Initial defragmented count: {}", initialDefragCount);
+
+ // Start the service
+ defragService.start();
+
+ // Wait for the service to process snapshots
+ try {
+ await(30000, 1000, () -> {
+ long currentCount = defragService.getSnapshotsDefraggedCount().get();
+ LOG.info("Current defragmented count: {}", currentCount);
+ return currentCount > initialDefragCount;
+ });
+ } catch (TimeoutException e) {
+ LOG.warn("Timeout waiting for defragmentation to complete, continuing with test");
+ }
+
+ LOG.info("SnapshotDefragService execution completed. Snapshots defragmented: {}",
+ defragService.getSnapshotsDefraggedCount().get());
+ }
+
+ @Test
+ public void testSnapshotDefragmentation() throws Exception {
+ String volumeName = "test-volume";
+ String bucketName = "test-bucket";
+
+ // Create volume and bucket
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ // TODO: Test FSO bucket as well, default is LEGACY / OBJECT_STORE
+ volume.createBucket(bucketName);
+
+ LOG.info("Starting snapshot defragmentation test...");
+
+ // Print initial state
+ printSnapshotDirectoryListing("Initial state - no snapshots");
+
+ // Step 1: Create 2 keys, then create snap-1
+ createKeys(volumeName, bucketName, 2);
+ createSnapshot(volumeName, bucketName, "snap-1");
+ printSnapshotDirectoryListing("After creating snap-1 (2 keys)");
+
+ // Step 2: Create 2 more keys, then create snap-2
+ createKeys(volumeName, bucketName, 2); // TODO: This actually overwrites the previous keys
+ createSnapshot(volumeName, bucketName, "snap-2");
+ printSnapshotDirectoryListing("After creating snap-2 (4 keys total)");
+
+ // Step 3: Create 2 more keys, then create snap-3
+ createKeys(volumeName, bucketName, 2); // TODO: This actually overwrites the previous keys
+ createSnapshot(volumeName, bucketName, "snap-3");
+ printSnapshotDirectoryListing("After creating snap-3 (6 keys total)");
+
+ // Step 4: Trigger SnapshotDefragService
+ triggerSnapshotDefragService();
+ printSnapshotDirectoryListing("After SnapshotDefragService execution");
+
+ // Verify that the snapshots still exist
+ SnapshotInfo snap1 = ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-1"));
+ SnapshotInfo snap2 = ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-2"));
+ SnapshotInfo snap3 = ozoneManager.getMetadataManager()
+ .getSnapshotInfoTable()
+ .get(SnapshotInfo.getTableKey(volumeName, bucketName, "snap-3"));
+
+ assertNotNull(snap1, "Snapshot snap-1 should exist");
+ assertNotNull(snap2, "Snapshot snap-2 should exist");
+ assertNotNull(snap3, "Snapshot snap-3 should exist");
+
+ LOG.info("Test completed successfully");
+ }
+}
From 0a131dd8db5e07f1c3ac2ba639cad3095af77de2 Mon Sep 17 00:00:00 2001
From: Siyao Meng <50227127+smengcl@users.noreply.github.com>
Date: Thu, 9 Oct 2025 01:04:15 -0700
Subject: [PATCH 10/17] Handle yaml previousSnapshotId null
---
.../org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
index 3a80915e6eac..bc7d5b684553 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotLocalDataYaml.java
@@ -227,7 +227,8 @@ private final class ConstructSnapshotLocalData extends AbstractConstruct {
public Object construct(Node node) {
MappingNode mnode = (MappingNode) node;
Map