Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1937,8 +1937,9 @@ message SnapshotMoveKeyInfos {
}

message SnapshotPurgeRequest {
repeated string snapshotDBKeys = 1;
repeated string snapshotDBKeys = 1 [deprecated = true];
repeated string updatedSnapshotDBKey = 2 [deprecated = true];
optional string snapshotKey = 3;
}

message SetSnapshotPropertyRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse.OmPurgeResponse;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
Expand All @@ -43,11 +46,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

Expand All @@ -71,103 +74,124 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

final long trxnLogIndex = termIndex.getIndex();

OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager =
omMetadataManager.getSnapshotChainManager();

OMClientResponse omClientResponse = null;
OMClientResponse omClientResponse;

OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
SnapshotPurgeRequest snapshotPurgeRequest = getOmRequest()
.getSnapshotPurgeRequest();

try {
List<String> snapshotDbKeys = snapshotPurgeRequest
.getSnapshotDBKeysList();
Map<String, SnapshotInfo> updatedSnapInfos = new HashMap<>();
Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots =
new HashMap<>();

// Each snapshot purge operation does three things:
// 1. Update the snapshot chain,
// 2. Update the deep clean flag for the next active snapshot (So that it can be
// deep cleaned by the KeyDeletingService in the next run),
// 3. Finally, purge the snapshot.
// All of these steps have to be performed only when it acquires all the necessary
// locks (lock on the snapshot to be purged, lock on the next active snapshot, and
// lock on the next path and global previous snapshots). Ideally, there is no need
// for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine
// is going to process each request sequentially.
//
// But there is a problem with that. After filtering unnecessary SST files for a snapshot,
// SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot
// use SetSnapshotProperty API because it runs on each OM independently and One OM does
// not know if the snapshot has been filtered on the other OM in HA environment.
//
// If locks are not taken snapshot purge and SstFilteringService will cause a race condition
// and override one's update with another.
for (String snapTableKey : snapshotDbKeys) {
// To acquire all the locks, a set is maintained which is keyed by snapshotTableKey.
// snapshotTableKey is nothing but /volumeName/bucketName/snapshotName.
// Once all the locks are acquired, it performs the three steps mentioned above and
// release all the locks after that.
Set<Triple<String, String, String>> lockSet = new HashSet<>(4, 1);
try {
if (omMetadataManager.getSnapshotInfoTable().get(snapTableKey) == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
LOG.warn("The snapshot {} is not longer in snapshot table, It maybe removed in the previous " +
"Snapshot purge request.", snapTableKey);
continue;
}

acquireLock(lockSet, snapTableKey, omMetadataManager);
SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapTableKey);

SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

if (nextSnapshot != null) {
acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager);
}

// Update the chain first so that it has all the necessary locks before updating deep clean.
updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex,
updatedPathPreviousAndGlobalSnapshots);
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex, updatedSnapInfos);
// Remove and close snapshot's RocksDB instance from SnapshotCache.
omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId());
// Update SnapshotInfoTable cache.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
} finally {
for (Triple<String, String, String> lockKey: lockSet) {
omMetadataManager.getLock()
.releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight());
}
// This is for backward compatibility. If ratis has batch purge transactions which are replayed
// either during service upgraded or later for any other reason when OM has been upgraded to non-batch.
if (!snapshotPurgeRequest.getSnapshotDBKeysList().isEmpty()) {
List<OmPurgeResponse> omPurgeResponses = new ArrayList<>();
for (String snapshotKey : snapshotPurgeRequest.getSnapshotDBKeysList()) {
OmPurgeResponse omPurgeResponse = purgeSnapshot(snapshotKey, ozoneManager, trxnLogIndex);
omPurgeResponses.add(omPurgeResponse);
}
omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), omPurgeResponses);
LOG.info("OmPurgeResponse list: {}.", omPurgeResponses);
} else if (snapshotPurgeRequest.hasSnapshotKey()) {
OmPurgeResponse omPurgeResponse =
purgeSnapshot(snapshotPurgeRequest.getSnapshotKey(), ozoneManager, trxnLogIndex);
omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(), omPurgeResponse);
LOG.info("OmPurgeResponse: {}.", omPurgeResponse);
} else {
throw new OMException("Both snapshotDBKeysList and snapshotKey are null." +
" One of them is required for snapshot purge operations.", OMException.ResultCodes.INVALID_REQUEST);
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
snapshotDbKeys, updatedSnapInfos,
updatedPathPreviousAndGlobalSnapshots);

omMetrics.incNumSnapshotPurges();
LOG.info("Successfully executed snapshotPurgeRequest: {{}} along with updating deep clean flags for " +
"snapshots: {} and global and previous for snapshots:{}.",
snapshotPurgeRequest, updatedSnapInfos.keySet(), updatedPathPreviousAndGlobalSnapshots.keySet());
LOG.debug("Successfully executed snapshotPurgeRequest: {{}}.", snapshotPurgeRequest);
} catch (IOException ex) {
omClientResponse = new OMSnapshotPurgeResponse(
createErrorOMResponse(omResponse, ex));
omClientResponse = new OMSnapshotPurgeResponse(createErrorOMResponse(omResponse, ex));
omMetrics.incNumSnapshotPurgeFails();
LOG.error("Failed to execute snapshotPurgeRequest:{{}}.", snapshotPurgeRequest, ex);
}

return omClientResponse;
}

// Each snapshot purge operation does three things:
// 1. Update the snapshot chain,
// 2. Update the deep clean flag for the next active snapshot (So that it can be
// deep cleaned by the KeyDeletingService in the next run),
// 3. Finally, purge the snapshot.
// All of these steps have to be performed only when it acquires all the necessary
// locks (lock on the snapshot to be purged, lock on the next active snapshot, and
// lock on the next path and global previous snapshots). Ideally, there is no need
// for locks for snapshot purge and can rely on OMStateMachine because OMStateMachine
// is going to process each request sequentially.
//
// But there is a problem with that. After filtering unnecessary SST files for a snapshot,
// SstFilteringService updates that snapshot's SstFilter flag. SstFilteringService cannot
// use SetSnapshotProperty API because it runs on each OM independently and One OM does
// not know if the snapshot has been filtered on the other OM in HA environment.
//
// If locks are not taken snapshot purge and SstFilteringService will cause a race condition
// and override one's update with another.
private OmPurgeResponse purgeSnapshot(String snapshotKey,
OzoneManager ozoneManager,
long trxnLogIndex) throws IOException {

OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager = omMetadataManager.getSnapshotChainManager();

if (omMetadataManager.getSnapshotInfoTable().get(snapshotKey) == null) {
// Snapshot may have been purged in the previous iteration of SnapshotDeletingService.
throw new OMException("Snapshot: '" + snapshotKey + "}' is no longer exist " +
"in snapshot table. Might be removed in previous run.", OMException.ResultCodes.FILE_NOT_FOUND);
}

// To acquire all the locks, a set is maintained which is keyed by a triple of volumeName, bucketName and
// snapshotName. SnapshotInfoTable key (which is /volumeName/bucketName/snapshotName) is not directly
// because volumeName, bucketName and snapshotName can't be obtained after purging snapshot from cache.
// Once all the necessary locks are acquired, the three steps mentioned above are performed and
// locks are release after that.
Set<Triple<String, String, String>> lockSet = new HashSet<>(4, 1);

try {
acquireLock(lockSet, snapshotKey, omMetadataManager);

SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable().get(snapshotKey);
SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);

if (nextSnapshot != null) {
acquireLock(lockSet, nextSnapshot.getTableKey(), omMetadataManager);
}

// Step 1: Update the snapshot chain.
Pair<SnapshotInfo, SnapshotInfo> pathToGlobalSnapshotInto =
updateSnapshotChainAndCache(lockSet, omMetadataManager, fromSnapshot, trxnLogIndex);
SnapshotInfo nextPathSnapshotInfo = null;
SnapshotInfo nextGlobalSnapshotInfo = null;

if (pathToGlobalSnapshotInto != null) {
nextPathSnapshotInfo = pathToGlobalSnapshotInto.getLeft();
nextGlobalSnapshotInfo = pathToGlobalSnapshotInto.getRight();
}

// Step 2: Update the deep clean flag for the next active snapshot
SnapshotInfo nextActiveSnapshotInfo = updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex);

// Remove and close snapshot's RocksDB instance from SnapshotCache.
ozoneManager.getOmSnapshotManager().invalidateCacheEntry(fromSnapshot.getSnapshotId());

// Step 3: Purge the snapshot from SnapshotInfoTable cache.
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));

return new OmPurgeResponse(snapshotKey, nextPathSnapshotInfo, nextGlobalSnapshotInfo,
nextActiveSnapshotInfo);
} finally {
lockSet.forEach(lockKey -> omMetadataManager.getLock()
.releaseWriteLock(SNAPSHOT_LOCK, lockKey.getLeft(), lockKey.getMiddle(), lockKey.getRight()));
}
}

private void acquireLock(Set<Triple<String, String, String>> lockSet, String snapshotTableKey,
OMMetadataManager omMetadataManager) throws IOException {
SnapshotInfo snapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapshotTableKey);
Expand All @@ -187,23 +211,25 @@ private void acquireLock(Set<Triple<String, String, String>> lockSet, String sna
}
}

private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
Map<String, SnapshotInfo> updatedSnapInfos) throws IOException {
if (snapInfo != null) {
// Fetch the latest value again after acquiring lock.
SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey());

// Setting next snapshot deep clean to false, Since the
// current snapshot is deleted. We can potentially
// reclaim more keys in the next snapshot.
updatedSnapshotInfo.setDeepClean(false);

// Update table cache first
omMetadataManager.getSnapshotInfoTable().addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()),
CacheValue.get(trxnLogIndex, updatedSnapshotInfo));
updatedSnapInfos.put(updatedSnapshotInfo.getTableKey(), updatedSnapshotInfo);
private SnapshotInfo updateSnapshotInfoAndCache(SnapshotInfo snapInfo, OmMetadataManagerImpl omMetadataManager,
long trxnLogIndex) throws IOException {
if (snapInfo == null) {
return null;
}

// Fetch the latest value again after acquiring lock.
SnapshotInfo updatedSnapshotInfo = omMetadataManager.getSnapshotInfoTable().get(snapInfo.getTableKey());

// Setting next snapshot deep clean to false, Since the
// current snapshot is deleted. We can potentially
// reclaim more keys in the next snapshot.
updatedSnapshotInfo.setDeepClean(false);

// Update table cache first
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(updatedSnapshotInfo.getTableKey()),
CacheValue.get(trxnLogIndex, updatedSnapshotInfo));
return updatedSnapshotInfo;
}

/**
Expand All @@ -212,19 +238,14 @@ private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
* It also returns the pair of updated next path and global snapshots to
* update in DB.
*/
private void updateSnapshotChainAndCache(
private Pair<SnapshotInfo, SnapshotInfo> updateSnapshotChainAndCache(
Set<Triple<String, String, String>> lockSet,
OmMetadataManagerImpl metadataManager,
SnapshotInfo snapInfo,
long trxnLogIndex,
Map<String, SnapshotInfo> updatedPathPreviousAndGlobalSnapshots
long trxnLogIndex
) throws IOException {
if (snapInfo == null) {
return;
}

SnapshotChainManager snapshotChainManager = metadataManager
.getSnapshotChainManager();
Table<String, SnapshotInfo> snapshotInfoTable = metadataManager.getSnapshotInfoTable();
SnapshotChainManager snapshotChainManager = metadataManager.getSnapshotChainManager();

// If the snapshot is deleted in the previous run, then the in-memory
// SnapshotChainManager might throw NoSuchElementException as the snapshot
Expand All @@ -237,16 +258,15 @@ private void updateSnapshotChainAndCache(
hasNextGlobalSnapshot = snapshotChainManager.hasNextGlobalSnapshot(
snapInfo.getSnapshotId());
} catch (NoSuchElementException ex) {
return;
return null;
}

String nextPathSnapshotKey = null;

if (hasNextPathSnapshot) {
UUID nextPathSnapshotId = snapshotChainManager.nextPathSnapshot(
snapInfo.getSnapshotPath(), snapInfo.getSnapshotId());
nextPathSnapshotKey = snapshotChainManager
.getTableKey(nextPathSnapshotId);
nextPathSnapshotKey = snapshotChainManager.getTableKey(nextPathSnapshotId);

// Acquire lock from the snapshot
acquireLock(lockSet, nextPathSnapshotKey, metadataManager);
Expand All @@ -262,43 +282,35 @@ private void updateSnapshotChainAndCache(
}

SnapshotInfo nextPathSnapInfo =
nextPathSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextPathSnapshotKey) : null;
nextPathSnapshotKey != null ? snapshotInfoTable.get(nextPathSnapshotKey) : null;

SnapshotInfo nextGlobalSnapInfo =
nextGlobalSnapshotKey != null ? metadataManager.getSnapshotInfoTable().get(nextGlobalSnapshotKey) : null;
nextGlobalSnapshotKey != null ? snapshotInfoTable.get(nextGlobalSnapshotKey) : null;

// Updates next path snapshot's previous snapshot ID
if (nextPathSnapInfo != null) {
nextPathSnapInfo.setPathPreviousSnapshotId(snapInfo.getPathPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextPathSnapInfo.getTableKey()),
snapshotInfoTable.addCacheEntry(new CacheKey<>(nextPathSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextPathSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
}

// Updates next global snapshot's previous snapshot ID
// If both next global and path snapshot are same, it may overwrite
// nextPathSnapInfo.setPathPreviousSnapshotID(), adding this check
// will prevent it.
if (nextGlobalSnapInfo != null && nextPathSnapInfo != null &&
nextGlobalSnapInfo.getSnapshotId().equals(nextPathSnapInfo.getSnapshotId())) {
Objects.equals(nextGlobalSnapInfo.getSnapshotId(), nextPathSnapInfo.getSnapshotId())) {
nextPathSnapInfo.setGlobalPreviousSnapshotId(snapInfo.getGlobalPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextPathSnapInfo.getTableKey()),
snapshotInfoTable.addCacheEntry(new CacheKey<>(nextPathSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextPathSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextPathSnapInfo.getTableKey(), nextPathSnapInfo);
nextGlobalSnapInfo = nextPathSnapInfo;
} else if (nextGlobalSnapInfo != null) {
nextGlobalSnapInfo.setGlobalPreviousSnapshotId(
snapInfo.getGlobalPreviousSnapshotId());
metadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(nextGlobalSnapInfo.getTableKey()),
nextGlobalSnapInfo.setGlobalPreviousSnapshotId(snapInfo.getGlobalPreviousSnapshotId());
snapshotInfoTable.addCacheEntry(new CacheKey<>(nextGlobalSnapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, nextGlobalSnapInfo));
updatedPathPreviousAndGlobalSnapshots
.put(nextGlobalSnapInfo.getTableKey(), nextGlobalSnapInfo);
}

snapshotChainManager.deleteSnapshot(snapInfo);
return Pair.of(nextPathSnapInfo, nextGlobalSnapInfo);
}
}
Loading