diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 0bfa98f991b9..1e8df1c67470 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4853,4 +4853,9 @@
10000
Maximum number of lock objects that could be present in the pool.
+
+ ozone.om.snapshot.local.data.manager.service.interval
+ 5m
+ Interval for cleaning up orphan snapshot local data versions corresponding to snapshots
+
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 0828be91ed4c..469900aa8ea7 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -678,6 +678,9 @@ public final class OMConfigKeys {
public static final String OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT =
"ozone.om.hierarchical.resource.locks.hard.limit";
public static final int OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT_DEFAULT = 10000;
+ public static final String OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL =
+ "ozone.om.snapshot.local.data.manager.service.interval";
+ public static final String OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT = "5m";
/**
* Never constructed.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 0954b029ab67..4fcb8ed22e8c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -196,9 +196,10 @@ public final class OmSnapshotManager implements AutoCloseable {
private final AtomicInteger inFlightSnapshotCount = new AtomicInteger(0);
public OmSnapshotManager(OzoneManager ozoneManager) throws IOException {
- this.snapshotLocalDataManager = new OmSnapshotLocalDataManager(ozoneManager.getMetadataManager());
- boolean isFilesystemSnapshotEnabled =
- ozoneManager.isFilesystemSnapshotEnabled();
+ OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager();
+ this.snapshotLocalDataManager = new OmSnapshotLocalDataManager(ozoneManager.getMetadataManager(),
+ omMetadataManager.getSnapshotChainManager(), ozoneManager.getConfiguration());
+ boolean isFilesystemSnapshotEnabled = ozoneManager.isFilesystemSnapshotEnabled();
LOG.info("Ozone filesystem snapshot feature is {}.",
isFilesystemSnapshotEnabled ? "enabled" : "disabled");
@@ -344,6 +345,16 @@ public OmSnapshotManager(OzoneManager ozoneManager) throws IOException {
}
}
+ public static boolean isSnapshotPurged(SnapshotChainManager chainManager, OMMetadataManager omMetadataManager,
+ UUID snapshotId, TransactionInfo transactionInfo) throws IOException {
+ String tableKey = chainManager.getTableKey(snapshotId);
+ if (tableKey == null) {
+ return true;
+ }
+ return !omMetadataManager.getSnapshotInfoTable().isExist(tableKey) && transactionInfo != null &&
+ isTransactionFlushedToDisk(omMetadataManager, transactionInfo);
+ }
+
/**
* Help reject OM startup if snapshot feature is disabled
* but there are snapshots remaining in this OM. Note: snapshots that are
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
index 33caddc92327..70955fa05783 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.om.snapshot;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml.YAML_FILE_EXTENSION;
import com.google.common.annotations.VisibleForTesting;
@@ -41,10 +43,13 @@
import java.util.Stack;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -52,6 +57,7 @@
import org.apache.hadoop.ozone.om.OmSnapshotLocalData.VersionMeta;
import org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.FlatResource;
import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
@@ -73,6 +79,7 @@
public class OmSnapshotLocalDataManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(OmSnapshotLocalDataManager.class);
+ private static final String LOCAL_DATA_MANAGER_SERVICE_NAME = "OmSnapshotLocalDataManagerService";
private final ObjectSerializer snapshotLocalDataSerializer;
// In-memory DAG of snapshot-version dependencies. Each node represents a
@@ -101,8 +108,13 @@ public class OmSnapshotLocalDataManager implements AutoCloseable {
private final ReadWriteLock internalLock;
// Locks should be always acquired by iterating through the snapshot chain to avoid deadlocks.
private HierarchicalResourceLockManager locks;
+ private Map snapshotToBeCheckedForOrphans;
+ private Scheduler scheduler;
+ private volatile boolean closed;
- public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager) throws IOException {
+ public OmSnapshotLocalDataManager(OMMetadataManager omMetadataManager,
+ SnapshotChainManager snapshotChainManager,
+ OzoneConfiguration configuration) throws IOException {
this.localDataGraph = GraphBuilder.directed().build();
this.omMetadataManager = omMetadataManager;
this.snapshotLocalDataSerializer = new YamlSerializer(
@@ -116,7 +128,7 @@ public void computeAndSetChecksum(Yaml yaml, OmSnapshotLocalData data) throws IO
this.versionNodeMap = new ConcurrentHashMap<>();
this.fullLock = new ReentrantReadWriteLock();
this.internalLock = new ReentrantReadWriteLock();
- init();
+ init(configuration, snapshotChainManager);
}
@VisibleForTesting
@@ -216,7 +228,7 @@ private LocalDataVersionNode getVersionNode(UUID snapshotId, int version) {
private void addSnapshotVersionMeta(UUID snapshotId, SnapshotVersionsMeta snapshotVersionsMeta)
throws IOException {
- if (!versionNodeMap.containsKey(snapshotId)) {
+ if (!versionNodeMap.containsKey(snapshotId) && !snapshotVersionsMeta.getSnapshotVersions().isEmpty()) {
for (LocalDataVersionNode versionNode : snapshotVersionsMeta.getSnapshotVersions().values()) {
validateVersionAddition(versionNode);
LocalDataVersionNode previousVersionNode =
@@ -261,8 +273,33 @@ void addVersionNodeWithDependents(OmSnapshotLocalData snapshotLocalData) throws
}
}
- private void init() throws IOException {
+ private void incrementOrphanCheckCount(UUID snapshotId) {
+ if (snapshotId != null) {
+ this.snapshotToBeCheckedForOrphans.compute(snapshotId, (k, v) -> v == null ? 1 : (v + 1));
+ }
+ }
+
+ private void decrementOrphanCheckCount(UUID snapshotId, int decrementBy) {
+ this.snapshotToBeCheckedForOrphans.compute(snapshotId, (k, v) -> {
+ if (v == null) {
+ return null;
+ }
+ int newValue = v - decrementBy;
+ if (newValue <= 0) {
+ return null;
+ }
+ return newValue;
+ });
+ }
+
+ @VisibleForTesting
+ Map getSnapshotToBeCheckedForOrphans() {
+ return snapshotToBeCheckedForOrphans;
+ }
+
+ private void init(OzoneConfiguration configuration, SnapshotChainManager chainManager) throws IOException {
this.locks = omMetadataManager.getHierarchicalLockManager();
+ this.snapshotToBeCheckedForOrphans = new ConcurrentHashMap<>();
RDBStore store = (RDBStore) omMetadataManager.getStore();
String checkpointPrefix = store.getDbLocation().getName();
File snapshotDir = new File(store.getSnapshotsParentDir());
@@ -283,6 +320,74 @@ private void init() throws IOException {
}
addVersionNodeWithDependents(snapshotLocalData);
}
+ for (UUID snapshotId : versionNodeMap.keySet()) {
+ incrementOrphanCheckCount(snapshotId);
+ }
+ long snapshotLocalDataManagerServiceInterval = configuration.getTimeDuration(
+ OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL,
+ OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ if (snapshotLocalDataManagerServiceInterval > 0) {
+ this.scheduler = new Scheduler(LOCAL_DATA_MANAGER_SERVICE_NAME, true, 1);
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ try {
+ checkOrphanSnapshotVersions(omMetadataManager, chainManager);
+ } catch (Exception e) {
+ LOG.error("Exception while checking orphan snapshot versions", e);
+ }
+ }, snapshotLocalDataManagerServiceInterval, snapshotLocalDataManagerServiceInterval, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ private void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager)
+ throws IOException {
+ for (Map.Entry entry : snapshotToBeCheckedForOrphans.entrySet()) {
+ UUID snapshotId = entry.getKey();
+ int countBeforeCheck = entry.getValue();
+ checkOrphanSnapshotVersions(metadataManager, chainManager, snapshotId);
+ decrementOrphanCheckCount(snapshotId, countBeforeCheck);
+ }
+ }
+
+ @VisibleForTesting
+ void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChainManager chainManager,
+ UUID snapshotId) throws IOException {
+ LOG.info("Checking orphan snapshot versions for snapshot {}", snapshotId);
+ try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider = new WritableOmSnapshotLocalDataProvider(
+ snapshotId)) {
+ OmSnapshotLocalData snapshotLocalData = snapshotLocalDataProvider.getSnapshotLocalData();
+ boolean isSnapshotPurged = OmSnapshotManager.isSnapshotPurged(chainManager, metadataManager, snapshotId,
+ snapshotLocalData.getTransactionInfo());
+ for (Map.Entry integerLocalDataVersionNodeEntry : getVersionNodeMap()
+ .get(snapshotId).getSnapshotVersions().entrySet()) {
+ LocalDataVersionNode versionEntry = integerLocalDataVersionNodeEntry.getValue();
+ // remove the version entry if it is not referenced by any other snapshot version node. For version node 0
+ // a newly created snapshot version could point to a version with indegree 0 in such a scenario a version 0
+ // node can be only deleted if the snapshot is also purged.
+ internalLock.readLock().lock();
+ try {
+ boolean toRemove = localDataGraph.inDegree(versionEntry) == 0
+ && ((versionEntry.getVersion() != 0 && versionEntry.getVersion() != snapshotLocalData.getVersion())
+ || isSnapshotPurged);
+ if (toRemove) {
+ LOG.info("Removing snapshot Id : {} version: {} from local data, snapshotLocalDataVersion : {}, " +
+ "snapshotPurged: {}, inDegree : {}", snapshotId, versionEntry.getVersion(),
+ snapshotLocalData.getVersion(), isSnapshotPurged, localDataGraph.inDegree(versionEntry));
+ snapshotLocalDataProvider.removeVersion(versionEntry.getVersion());
+ }
+ } finally {
+ internalLock.readLock().unlock();
+ }
+ }
+ // If Snapshot is purged but not flushed completely to disk then this needs to wait for the next iteration
+ // which can be done by incrementing the orphan check count for the snapshotId.
+ if (!snapshotLocalData.getVersionSstFileInfos().isEmpty() && snapshotLocalData.getTransactionInfo() != null) {
+ incrementOrphanCheckCount(snapshotId);
+ }
+ snapshotLocalDataProvider.commit();
+ }
}
/**
@@ -326,13 +431,19 @@ private void validateVersionAddition(LocalDataVersionNode versionNode) throws IO
}
@Override
- public void close() {
- if (snapshotLocalDataSerializer != null) {
- try {
- snapshotLocalDataSerializer.close();
- } catch (IOException e) {
- LOG.error("Failed to close snapshot local data serializer", e);
+ public synchronized void close() {
+ if (!closed) {
+ if (snapshotLocalDataSerializer != null) {
+ try {
+ snapshotLocalDataSerializer.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close snapshot local data serializer", e);
+ }
}
+ if (scheduler != null) {
+ scheduler.close();
+ }
+ closed = true;
}
}
@@ -730,30 +841,66 @@ public synchronized void commit() throws IOException {
// Need to update the disk state if and only if the dirty bit is set.
if (isDirty()) {
String filePath = getSnapshotLocalPropertyYamlPath(super.snapshotId);
- String tmpFilePath = filePath + ".tmp";
- File tmpFile = new File(tmpFilePath);
- boolean tmpFileExists = tmpFile.exists();
- if (tmpFileExists) {
- tmpFileExists = !tmpFile.delete();
- }
- if (tmpFileExists) {
- throw new IOException("Unable to delete tmp file " + tmpFilePath);
+ File snapshotLocalDataFile = new File(filePath);
+ if (!localDataVersionNodes.getSnapshotVersions().isEmpty()) {
+ String tmpFilePath = filePath + ".tmp";
+ File tmpFile = new File(tmpFilePath);
+ boolean tmpFileExists = tmpFile.exists();
+ if (tmpFileExists) {
+ tmpFileExists = !tmpFile.delete();
+ }
+ if (tmpFileExists) {
+ throw new IOException("Unable to delete tmp file " + tmpFilePath);
+ }
+ snapshotLocalDataSerializer.save(new File(tmpFilePath), super.snapshotLocalData);
+ Files.move(tmpFile.toPath(), Paths.get(filePath), StandardCopyOption.ATOMIC_MOVE,
+ StandardCopyOption.REPLACE_EXISTING);
+ } else if (snapshotLocalDataFile.exists()) {
+ LOG.info("Deleting YAML file corresponding to snapshotId: {} in path : {}",
+ super.snapshotId, snapshotLocalDataFile.getAbsolutePath());
+ if (!snapshotLocalDataFile.delete()) {
+ throw new IOException("Unable to delete file " + snapshotLocalDataFile.getAbsolutePath());
+ }
}
- snapshotLocalDataSerializer.save(new File(tmpFilePath), super.snapshotLocalData);
- Files.move(tmpFile.toPath(), Paths.get(filePath), StandardCopyOption.ATOMIC_MOVE,
- StandardCopyOption.REPLACE_EXISTING);
- upsertNode(super.snapshotId, localDataVersionNodes);
+ SnapshotVersionsMeta previousVersionMeta = upsertNode(super.snapshotId, localDataVersionNodes);
+ checkForOphanVersionsAndIncrementCount(super.snapshotId, previousVersionMeta, localDataVersionNodes,
+ getSnapshotLocalData().getTransactionInfo() != null);
// Reset dirty bit
resetDirty();
}
}
- private void upsertNode(UUID snapshotId, SnapshotVersionsMeta snapshotVersions) throws IOException {
+ private void checkForOphanVersionsAndIncrementCount(UUID snapshotId, SnapshotVersionsMeta previousVersionsMeta,
+ SnapshotVersionsMeta currentVersionMeta, boolean isPurgeTransactionSet) {
+ if (previousVersionsMeta != null) {
+ Map currentVersionNodeMap = currentVersionMeta.getSnapshotVersions();
+ Map previousVersionNodeMap = previousVersionsMeta.getSnapshotVersions();
+ boolean versionsRemoved = previousVersionNodeMap.keySet().stream()
+ .anyMatch(version -> !currentVersionNodeMap.containsKey(version));
+
+ // The previous snapshotId could have become an orphan entry or could have orphan versions.(In case of
+ // version removals)
+ if (versionsRemoved || !Objects.equals(previousVersionsMeta.getPreviousSnapshotId(),
+ currentVersionMeta.getPreviousSnapshotId())) {
+ incrementOrphanCheckCount(previousVersionsMeta.getPreviousSnapshotId());
+ }
+ // If the transactionInfo set, this means the snapshot has been purged and the entire YAML file could have
+ // become an orphan. Otherwise if the version is updated it
+ // could mean that there could be some orphan version present within the
+ // same snapshot.
+ if (isPurgeTransactionSet || previousVersionsMeta.getVersion() != currentVersionMeta.getVersion()) {
+ incrementOrphanCheckCount(snapshotId);
+ }
+ }
+ }
+
+ private SnapshotVersionsMeta upsertNode(UUID snapshotId, SnapshotVersionsMeta snapshotVersions) throws IOException {
internalLock.writeLock().lock();
try {
SnapshotVersionsMeta existingSnapVersions = getVersionNodeMap().remove(snapshotId);
Map existingVersions = existingSnapVersions == null ? Collections.emptyMap() :
existingSnapVersions.getSnapshotVersions();
+ Map newVersions = snapshotVersions.getSnapshotVersions();
Map> predecessors = new HashMap<>();
// Track all predecessors of the existing versions and remove the node from the graph.
for (Map.Entry existingVersion : existingVersions.entrySet()) {
@@ -763,14 +910,16 @@ private void upsertNode(UUID snapshotId, SnapshotVersionsMeta snapshotVersions)
predecessors.put(existingVersion.getKey(), new ArrayList<>(localDataGraph.predecessors(existingVersionNode)));
localDataGraph.removeNode(existingVersionNode);
}
+
// Add the nodes to be added in the graph and map.
addSnapshotVersionMeta(snapshotId, snapshotVersions);
// Reconnect all the predecessors for existing nodes.
- for (Map.Entry entry : snapshotVersions.getSnapshotVersions().entrySet()) {
+ for (Map.Entry entry : newVersions.entrySet()) {
for (LocalDataVersionNode predecessor : predecessors.getOrDefault(entry.getKey(), Collections.emptyList())) {
localDataGraph.putEdge(predecessor, entry.getValue());
}
}
+ return existingSnapVersions;
} finally {
internalLock.writeLock().unlock();
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
index df26fa742e84..9aa56d2dd027 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshotLocalDataManager.java
@@ -19,6 +19,7 @@
import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_SEPARATOR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OmSnapshotLocalDataYaml.YAML_FILE_EXTENSION;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE;
import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE;
@@ -31,7 +32,9 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
@@ -40,6 +43,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -61,6 +65,7 @@
import org.apache.commons.compress.utils.Sets;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
@@ -88,6 +93,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
+import org.mockito.MockedStatic;
import org.mockito.MockitoAnnotations;
import org.rocksdb.LiveFileMetaData;
import org.yaml.snakeyaml.Yaml;
@@ -100,6 +106,8 @@ public class TestOmSnapshotLocalDataManager {
private static YamlSerializer snapshotLocalDataYamlSerializer;
private static List lockCapturor;
+ private static OzoneConfiguration conf;
+ private static Map purgedSnapshotIdMap;
@Mock
private OMMetadataManager omMetadataManager;
@@ -120,6 +128,7 @@ public class TestOmSnapshotLocalDataManager {
private AutoCloseable mocks;
private File snapshotsDir;
+ private MockedStatic snapshotUtilMock;
private static final String READ_LOCK_MESSAGE_ACQUIRE = "readLock acquire";
private static final String READ_LOCK_MESSAGE_UNLOCK = "readLock unlock";
@@ -128,6 +137,7 @@ public class TestOmSnapshotLocalDataManager {
@BeforeAll
public static void setupClass() {
+ conf = new OzoneConfiguration();
snapshotLocalDataYamlSerializer = new YamlSerializer(
new OmSnapshotLocalDataYaml.YamlFactory()) {
@@ -137,6 +147,7 @@ public void computeAndSetChecksum(Yaml yaml, OmSnapshotLocalData data) throws IO
}
};
lockCapturor = new ArrayList<>();
+ purgedSnapshotIdMap = new HashMap<>();
}
@AfterAll
@@ -162,6 +173,11 @@ public void setUp() throws IOException {
when(rdbStore.getSnapshotsParentDir()).thenReturn(snapshotsDir.getAbsolutePath());
when(rdbStore.getDbLocation()).thenReturn(dbLocation);
+ this.snapshotUtilMock = mockStatic(OmSnapshotManager.class, CALLS_REAL_METHODS);
+ purgedSnapshotIdMap.clear();
+ snapshotUtilMock.when(() -> OmSnapshotManager.isSnapshotPurged(any(), any(), any(), any()))
+ .thenAnswer(i -> purgedSnapshotIdMap.getOrDefault(i.getArgument(2), false));
+ conf.setInt(OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL, -1);
}
@AfterEach
@@ -172,6 +188,9 @@ public void tearDown() throws Exception {
if (mocks != null) {
mocks.close();
}
+ if (snapshotUtilMock != null) {
+ snapshotUtilMock.close();
+ }
}
private String getReadLockMessageAcquire(UUID snapshotId) {
@@ -276,7 +295,7 @@ private void mockSnapshotStore(UUID snapshotId, List sstFiles)
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = new ArrayList<>();
snapshotIds.add(null);
snapshotIds.addAll(createSnapshotLocalData(localDataManager, 20));
@@ -328,7 +347,7 @@ public void testLockOrderingAgainstAnotherSnapshot(boolean read) throws IOExcept
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testVersionLockResolution(boolean read) throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 5);
for (int snapIdx = 0; snapIdx < snapshotIds.size(); snapIdx++) {
UUID snapId = snapshotIds.get(snapIdx);
@@ -366,7 +385,7 @@ public void testVersionLockResolution(boolean read) throws IOException {
@Test
public void testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExisting() throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 2);
UUID snapId = snapshotIds.get(1);
try (WritableOmSnapshotLocalDataProvider omSnapshotLocalDataProvider =
@@ -382,7 +401,7 @@ public void testWriteVersionAdditionValidationWithoutPreviousSnapshotVersionExis
@Test
public void testUpdateTransactionInfo() throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
TransactionInfo transactionInfo = TransactionInfo.valueOf(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong());
UUID snapshotId = createSnapshotLocalData(localDataManager, 1).get(0);
@@ -401,7 +420,7 @@ public void testUpdateTransactionInfo() throws IOException {
@Test
public void testAddVersionFromRDB() throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 2);
addVersionsToLocalData(localDataManager, snapshotIds.get(0), ImmutableMap.of(4, 5, 6, 8));
UUID snapId = snapshotIds.get(1);
@@ -435,10 +454,79 @@ private void validateVersions(OmSnapshotLocalDataManager snapshotLocalDataManage
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOrphanVersionDeletionWithVersionDeletion(boolean purgeSnapshot) throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
+ List snapshotIds = createSnapshotLocalData(localDataManager, 3);
+ UUID firstSnapId = snapshotIds.get(0);
+ UUID secondSnapId = snapshotIds.get(1);
+ UUID thirdSnapId = snapshotIds.get(2);
+
+ addVersionsToLocalData(localDataManager, firstSnapId, ImmutableMap.of(1, 1, 2, 2, 3, 3));
+ addVersionsToLocalData(localDataManager, secondSnapId, ImmutableMap.of(4, 2, 8, 1, 10, 3, 11, 3));
+ addVersionsToLocalData(localDataManager, thirdSnapId, ImmutableMap.of(5, 8, 13, 10));
+ assertEquals(new HashSet<>(snapshotIds), localDataManager.getSnapshotToBeCheckedForOrphans().keySet());
+ localDataManager.getSnapshotToBeCheckedForOrphans().clear();
+ purgedSnapshotIdMap.put(secondSnapId, purgeSnapshot);
+ localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, thirdSnapId);
+ try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(thirdSnapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ assertEquals(Sets.newHashSet(0, 13), snapshotLocalData.getVersionSstFileInfos().keySet());
+ }
+ assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId));
+ localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, secondSnapId);
+ try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(secondSnapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ if (purgeSnapshot) {
+ assertEquals(Sets.newHashSet(0, 10), snapshotLocalData.getVersionSstFileInfos().keySet());
+ } else {
+ assertEquals(Sets.newHashSet(0, 10, 11), snapshotLocalData.getVersionSstFileInfos().keySet());
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOrphanVersionDeletionWithChainUpdate(boolean purgeSnapshot) throws IOException {
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
+ List snapshotIds = createSnapshotLocalData(localDataManager, 3);
+ UUID firstSnapId = snapshotIds.get(0);
+ UUID secondSnapId = snapshotIds.get(1);
+ UUID thirdSnapId = snapshotIds.get(2);
+
+ addVersionsToLocalData(localDataManager, firstSnapId, ImmutableMap.of(1, 1, 2, 2, 3, 3));
+ addVersionsToLocalData(localDataManager, secondSnapId, ImmutableMap.of(4, 2, 8, 1, 10, 3, 11, 3));
+ addVersionsToLocalData(localDataManager, thirdSnapId, ImmutableMap.of(5, 8, 13, 10));
+ purgedSnapshotIdMap.put(secondSnapId, purgeSnapshot);
+ try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider =
+ localDataManager.getWritableOmSnapshotLocalData(thirdSnapId, firstSnapId)) {
+ snapshotLocalDataProvider.commit();
+ }
+ try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(thirdSnapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ assertEquals(Sets.newHashSet(0, 5, 13), snapshotLocalData.getVersionSstFileInfos().keySet());
+ assertEquals(firstSnapId, snapshotLocalData.getPreviousSnapshotId());
+ }
+
+ assertTrue(localDataManager.getSnapshotToBeCheckedForOrphans().containsKey(secondSnapId));
+ localDataManager.checkOrphanSnapshotVersions(omMetadataManager, null, secondSnapId);
+ if (purgeSnapshot) {
+ assertThrows(NoSuchFileException.class,
+ () -> localDataManager.getOmSnapshotLocalData(secondSnapId));
+ assertFalse(localDataManager.getVersionNodeMap().containsKey(secondSnapId));
+ } else {
+ try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(secondSnapId)) {
+ OmSnapshotLocalData snapshotLocalData = snap.getSnapshotLocalData();
+ assertEquals(Sets.newHashSet(0, 11), snapshotLocalData.getVersionSstFileInfos().keySet());
+ }
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testWriteWithChainUpdate(boolean previousSnapshotExisting) throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 3 + (previousSnapshotExisting ? 1 : 0));
int snapshotIdx = 1 + (previousSnapshotExisting ? 1 : 0);
for (UUID snapshotId : snapshotIds) {
@@ -490,7 +578,7 @@ public void testWriteWithChainUpdate(boolean previousSnapshotExisting) throws IO
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testWriteVersionValidation(boolean nextVersionExisting) throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 3);
UUID prevSnapId = snapshotIds.get(0);
UUID snapId = snapshotIds.get(1);
@@ -564,7 +652,7 @@ private void addVersionsToLocalData(OmSnapshotLocalDataManager snapshotLocalData
@ParameterizedTest
@ValueSource(ints = {1, 2, 3})
public void testNeedsDefrag(int previousVersion) throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 2);
for (UUID snapshotId : snapshotIds) {
try (ReadableOmSnapshotLocalDataProvider snap = localDataManager.getOmSnapshotLocalData(snapshotId)) {
@@ -584,7 +672,7 @@ public void testNeedsDefrag(int previousVersion) throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testVersionResolution(boolean read) throws IOException {
- localDataManager = new OmSnapshotLocalDataManager(omMetadataManager);
+ localDataManager = new OmSnapshotLocalDataManager(omMetadataManager, null, conf);
List snapshotIds = createSnapshotLocalData(localDataManager, 5);
List