Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static SnapshotStatus valueOf(SnapshotStatusProto status) {
* RocksDB's transaction sequence number at the time of checkpoint creation.
*/
private long dbTxSequenceNumber;
private boolean deepClean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should it be deepCleaned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Private constructor, constructed via builder.
Expand All @@ -140,6 +141,8 @@ public static SnapshotStatus valueOf(SnapshotStatusProto status) {
* @param globalPreviousSnapshotId - Snapshot global previous snapshot id.
* @param snapshotPath - Snapshot path, bucket .snapshot path.
* @param checkpointDir - Snapshot checkpoint directory.
* @param dbTxSequenceNumber - RDB latest transaction sequence number.
* @param deepCleaned - To be deep cleaned status for snapshot.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private SnapshotInfo(UUID snapshotId,
Expand All @@ -153,7 +156,8 @@ private SnapshotInfo(UUID snapshotId,
UUID globalPreviousSnapshotId,
String snapshotPath,
String checkpointDir,
long dbTxSequenceNumber) {
long dbTxSequenceNumber,
boolean deepCleaned) {
this.snapshotId = snapshotId;
this.name = name;
this.volumeName = volumeName;
Expand All @@ -166,6 +170,7 @@ private SnapshotInfo(UUID snapshotId,
this.snapshotPath = snapshotPath;
this.checkpointDir = checkpointDir;
this.dbTxSequenceNumber = dbTxSequenceNumber;
this.deepClean = deepCleaned;
}

public void setName(String name) {
Expand Down Expand Up @@ -204,6 +209,14 @@ public void setCheckpointDir(String checkpointDir) {
this.checkpointDir = checkpointDir;
}

public boolean getDeepClean() {
return deepClean;
}

public void setDeepClean(boolean deepClean) {
this.deepClean = deepClean;
}

public UUID getSnapshotId() {
return snapshotId;
}
Expand Down Expand Up @@ -265,7 +278,8 @@ public SnapshotInfo.Builder toBuilder() {
.setPathPreviousSnapshotId(pathPreviousSnapshotId)
.setGlobalPreviousSnapshotId(globalPreviousSnapshotId)
.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir);
.setCheckpointDir(checkpointDir)
.setDeepClean(deepClean);
}

/**
Expand All @@ -284,6 +298,7 @@ public static class Builder {
private String snapshotPath;
private String checkpointDir;
private long dbTxSequenceNumber;
private boolean deepClean;

public Builder() {
// default values
Expand Down Expand Up @@ -350,6 +365,11 @@ public Builder setDbTxSequenceNumber(long dbTxSequenceNumber) {
return this;
}

public Builder setDeepClean(boolean deepClean) {
this.deepClean = deepClean;
return this;
}

public SnapshotInfo build() {
Preconditions.checkNotNull(name);
return new SnapshotInfo(
Expand All @@ -364,7 +384,8 @@ public SnapshotInfo build() {
globalPreviousSnapshotId,
snapshotPath,
checkpointDir,
dbTxSequenceNumber
dbTxSequenceNumber,
deepClean
);
}
}
Expand Down Expand Up @@ -393,7 +414,8 @@ public OzoneManagerProtocolProtos.SnapshotInfo getProtobuf() {

sib.setSnapshotPath(snapshotPath)
.setCheckpointDir(checkpointDir)
.setDbTxSequenceNumber(dbTxSequenceNumber);
.setDbTxSequenceNumber(dbTxSequenceNumber)
.setDeepClean(deepClean);
return sib.build();
}

Expand Down Expand Up @@ -425,6 +447,10 @@ public static SnapshotInfo getFromProtobuf(
fromProtobuf(snapshotInfoProto.getGlobalPreviousSnapshotID()));
}

if (snapshotInfoProto.hasDeepClean()) {
osib.setDeepClean(snapshotInfoProto.getDeepClean());
}

osib.setSnapshotPath(snapshotInfoProto.getSnapshotPath())
.setCheckpointDir(snapshotInfoProto.getCheckpointDir())
.setDbTxSequenceNumber(snapshotInfoProto.getDbTxSequenceNumber());
Expand Down Expand Up @@ -509,7 +535,8 @@ public static SnapshotInfo newInstance(String volumeName,
.setGlobalPreviousSnapshotId(INITIAL_SNAPSHOT_ID)
.setSnapshotPath(volumeName + OM_KEY_PREFIX + bucketName)
.setVolumeName(volumeName)
.setBucketName(bucketName);
.setBucketName(bucketName)
.setDeepClean(true);

if (snapshotId != null) {
builder.setCheckpointDir(getCheckpointDirName(snapshotId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ private SnapshotInfo createSnapshotInfo() {
.setSnapshotPath(SNAPSHOT_PATH)
.setCheckpointDir(CHECKPOINT_DIR)
.setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
.setDeepClean(true)
.build();
}

Expand All @@ -83,6 +84,7 @@ private OzoneManagerProtocolProtos.SnapshotInfo createSnapshotInfoProto() {
.setSnapshotPath(SNAPSHOT_PATH)
.setCheckpointDir(CHECKPOINT_DIR)
.setDbTxSequenceNumber(DB_TX_SEQUENCE_NUMBER)
.setDeepClean(true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ message SnapshotInfo {
optional string snapshotPath = 10;
optional string checkpointDir = 11;
optional int64 dbTxSequenceNumber = 12;
optional bool deepClean = 13;
}

message OzoneObj {
Expand Down Expand Up @@ -1746,6 +1747,7 @@ message SnapshotMoveKeyInfos {

message SnapshotPurgeRequest {
repeated string snapshotDBKeys = 1;
repeated string updatedSnapshotDBKey = 2;
}

message DeleteTenantRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveDeletedKeysResponse;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
Expand All @@ -41,7 +42,6 @@

import java.io.IOException;
import java.util.List;
import java.util.UUID;

import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.FILESYSTEM_SNAPSHOT;
Expand Down Expand Up @@ -86,7 +86,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
fromSnapshot.getBucketName(),
getSnapshotPrefix(fromSnapshot.getName()), true);

nextSnapshot = getNextActiveSnapshot(fromSnapshot,
nextSnapshot = SnapshotUtils.getNextActiveSnapshot(fromSnapshot,
snapshotChainManager, omSnapshotManager);

// Get next non-deleted snapshot.
Expand Down Expand Up @@ -122,32 +122,5 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

return omClientResponse;
}

/**
* Get the next non deleted snapshot in the snapshot chain.
*/
private SnapshotInfo getNextActiveSnapshot(
SnapshotInfo snapInfo,
SnapshotChainManager chainManager,
OmSnapshotManager omSnapshotManager
) throws IOException {
while (chainManager.hasNextPathSnapshot(snapInfo.getSnapshotPath(),
snapInfo.getSnapshotId())) {

UUID nextPathSnapshot = chainManager.nextPathSnapshot(
snapInfo.getSnapshotPath(), snapInfo.getSnapshotId());

String tableKey = chainManager.getTableKey(nextPathSnapshot);
SnapshotInfo nextSnapshotInfo =
omSnapshotManager.getSnapshotInfo(tableKey);

if (nextSnapshotInfo.getSnapshotStatus().equals(
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE)) {
return nextSnapshotInfo;
}
snapInfo = nextSnapshotInfo;
}
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,25 @@

package org.apache.hadoop.ozone.om.request.snapshot;

import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;

/**
Expand All @@ -44,6 +53,11 @@ public OMSnapshotPurgeRequest(OMRequest omRequest) {
@Override
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager =
omMetadataManager.getSnapshotChainManager();

OMClientResponse omClientResponse = null;

Expand All @@ -52,14 +66,61 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
SnapshotPurgeRequest snapshotPurgeRequest = getOmRequest()
.getSnapshotPurgeRequest();

List<String> snapshotDbKeys = snapshotPurgeRequest
.getSnapshotDBKeysList();
try {
List<String> snapshotDbKeys = snapshotPurgeRequest
.getSnapshotDBKeysList();
List<String> snapInfosToUpdate = snapshotPurgeRequest
.getUpdatedSnapshotDBKeyList();
HashMap<String, SnapshotInfo> updatedSnapInfo = new HashMap<>();

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
snapshotDbKeys);
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
omDoubleBufferHelper);
// Snapshots that are already deepCleaned by the KeyDeletingService
// can be marked as deepCleaned.
for (String snapTableKey : snapInfosToUpdate) {
SnapshotInfo snapInfo = omMetadataManager.getSnapshotInfoTable()
.get(snapTableKey);

updateSnapshotInfoAndCache(snapInfo, omMetadataManager,
trxnLogIndex, updatedSnapInfo, false);
}

// Snapshots that are purged by the SnapshotDeletingService
// will update the next snapshot so that is can be deep cleaned
// by the KeyDeletingService in the next run.
for (String snapTableKey : snapshotDbKeys) {
SnapshotInfo fromSnapshot = omMetadataManager.getSnapshotInfoTable()
.get(snapTableKey);

SnapshotInfo nextSnapshot = SnapshotUtils
.getNextActiveSnapshot(fromSnapshot,
snapshotChainManager, omSnapshotManager);
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager,
trxnLogIndex, updatedSnapInfo, true);
}

omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
snapshotDbKeys, updatedSnapInfo);
} catch (IOException ex) {
omClientResponse = new OMSnapshotPurgeResponse(
createErrorOMResponse(omResponse, ex));
} finally {
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
omDoubleBufferHelper);
}

return omClientResponse;
}

private void updateSnapshotInfoAndCache(SnapshotInfo snapInfo,
OmMetadataManagerImpl omMetadataManager, long trxnLogIndex,
HashMap<String, SnapshotInfo> updatedSnapInfo, boolean deepClean) {
if (snapInfo != null) {
snapInfo.setDeepClean(deepClean);

// Update table cache first
omMetadataManager.getSnapshotInfoTable().addCacheEntry(
new CacheKey<>(snapInfo.getTableKey()),
CacheValue.get(trxnLogIndex, snapInfo));
updatedSnapInfo.put(snapInfo.getTableKey(), snapInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
// Init Batch Operation for snapshot db.
try (BatchOperation writeBatch = fromSnapshotStore.initBatchOperation()) {
processKeys(writeBatch, fromSnapshot.getMetadataManager());
processKeysToUpdate(writeBatch, fromSnapshot.getMetadataManager());
fromSnapshotStore.commitBatchOperation(writeBatch);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;

Expand All @@ -48,12 +50,24 @@
public class OMSnapshotPurgeResponse extends OMClientResponse {
private static final Logger LOG =
LoggerFactory.getLogger(OMSnapshotPurgeResponse.class);
private final List<String> snapshotDbKeys;
private List<String> snapshotDbKeys;
private HashMap<String, SnapshotInfo> updatedSnapInfo;

public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse,
@Nonnull List<String> snapshotDbKeys) {
@Nonnull List<String> snapshotDbKeys,
HashMap<String, SnapshotInfo> updatedSnapInfo) {
super(omResponse);
this.snapshotDbKeys = snapshotDbKeys;
this.updatedSnapInfo = updatedSnapInfo;
}

/**
* For when the request is not successful.
* For a successful request, the other constructor should be used.
*/
public OMSnapshotPurgeResponse(@Nonnull OMResponse omResponse) {
super(omResponse);
checkStatusNotOK();
}

@Override
Expand All @@ -62,6 +76,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager,

OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
omMetadataManager;
updateSnapInfo(metadataManager, batchOperation);
for (String dbKey: snapshotDbKeys) {
SnapshotInfo snapshotInfo = omMetadataManager
.getSnapshotInfoTable().get(dbKey);
Expand All @@ -80,6 +95,15 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager,
}
}

private void updateSnapInfo(OmMetadataManagerImpl metadataManager,
BatchOperation batchOp)
throws IOException {
for (Map.Entry<String, SnapshotInfo> entry : updatedSnapInfo.entrySet()) {
metadataManager.getSnapshotInfoTable().putWithBatch(batchOp,
entry.getKey(), entry.getValue());
}
}

/**
* Cleans up the snapshot chain and updates next snapshot's
* previousPath and previousGlobal IDs.
Expand Down
Loading