Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,10 @@ message DeletedKeys {
repeated string keys = 3;
}



message PurgeKeysRequest {
repeated DeletedKeys deletedKeys = 1;
// if set, will purge keys in a snapshot DB instead of active DB
optional string snapshotTableKey = 2;
}

message PurgeKeysResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,6 @@ boolean recoverTrash(String volumeName, String bucketName,
List<OmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;

/**
* Returns a list of pending deletion key info that ups to the given count.
* Each entry is a {@link BlockGroup}, which contains the info about the key
* name and all its associated block IDs. A pending deletion key is stored
* with #deleting# prefix in OM DB.
*
* @param count max number of keys to return.
* @return a list of {@link BlockGroup} represent keys and blocks.
* @throws IOException
*/
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;

/**
* Returns the names of up to {@code count} open keys whose age is
* greater than or equal to {@code expireThreshold}.
Expand Down Expand Up @@ -544,4 +532,10 @@ String getMultipartKey(long volumeId, long bucketId,
*/
long getBucketId(String volume, String bucket) throws IOException;

/**
* Returns List<{@link BlockGroup}> for a key in the deletedTable.
* @param deletedKey - key to be purged from the deletedTable
* @return {@link BlockGroup}
*/
List<BlockGroup> getBlocksForKeyDelete(String deletedKey) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ public void start(OzoneConfiguration configuration) {
TimeUnit.MILLISECONDS);
try {
snapshotDeletingService = new SnapshotDeletingService(
snapshotServiceInterval, snapshotServiceTimeout, ozoneManager);
snapshotServiceInterval, snapshotServiceTimeout,
ozoneManager, scmClient.getBlockClient());
snapshotDeletingService.start();
} catch (IOException e) {
LOG.error("Error starting Snapshot Deleting Service", e);
Expand Down Expand Up @@ -619,7 +620,10 @@ public List<RepeatedOmKeyInfo> listTrash(String volumeName,
@Override
public List<BlockGroup> getPendingDeletionKeys(final int count)
throws IOException {
return metadataManager.getPendingDeletionKeys(count);
OmMetadataManagerImpl omMetadataManager =
(OmMetadataManagerImpl) metadataManager;
return omMetadataManager
.getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
Expand Down Expand Up @@ -343,7 +344,7 @@ protected OmMetadataManagerImpl() {
File checkpoint = Paths.get(metaDir.toPath().toString(), dbName).toFile();
RDBCheckpointManager.waitForCheckpointDirectoryExist(checkpoint);
}
setStore(loadDB(conf, metaDir, dbName, true,
setStore(loadDB(conf, metaDir, dbName, false,
java.util.Optional.of(Boolean.TRUE)));
initializeOmTables(false);
}
Expand Down Expand Up @@ -1394,16 +1395,34 @@ private PersistedUserVolumeInfo getVolumesByUser(String userNameKey)
}
}

@Override
public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
throws IOException {
/**
* Returns a list of pending deletion key info up to the limit.
* Each entry is a {@link BlockGroup}, which contains the info about the key
* name and all its associated block IDs.
*
* @param keyCount max number of keys to return.
* @param omSnapshotManager SnapshotManager
* @return a list of {@link BlockGroup} represent keys and blocks.
* @throws IOException
*/
public List<BlockGroup> getPendingDeletionKeys(final int keyCount,
OmSnapshotManager omSnapshotManager) throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>>
keyIter = getDeletedTable().iterator()) {
int currentCount = 0;
while (keyIter.hasNext() && currentCount < keyCount) {
KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
if (kv != null) {
List<BlockGroup> blockGroupList = Lists.newArrayList();
// Get volume name and bucket name
String[] keySplit = kv.getKey().split(OM_KEY_PREFIX);
// Get the latest snapshot in snapshot path.
OmSnapshot latestSnapshot = getLatestSnapshot(keySplit[1],
keySplit[2], omSnapshotManager);
String bucketKey = getBucketKey(keySplit[1], keySplit[2]);
OmBucketInfo bucketInfo = getBucketTable().get(bucketKey);

// Multiple keys with the same path can be queued in one DB entry
RepeatedOmKeyInfo infoList = kv.getValue();
for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) {
Expand All @@ -1422,6 +1441,37 @@ public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
// 4. Further optimization: Skip all snapshotted keys altogether
// e.g. by prefixing all unreclaimable keys, then calling seek

if (latestSnapshot != null) {
Table<String, OmKeyInfo> prevKeyTable =
latestSnapshot.getMetadataManager().getKeyTable(
bucketInfo.getBucketLayout());
String prevDbKey;
if (bucketInfo.getBucketLayout().isFileSystemOptimized()) {
long volumeId = getVolumeId(info.getVolumeName());
prevDbKey = getOzonePathKey(volumeId,
bucketInfo.getObjectID(),
info.getParentObjectID(),
info.getKeyName());
} else {
prevDbKey = getOzoneKey(info.getVolumeName(),
info.getBucketName(),
info.getKeyName());
}

OmKeyInfo omKeyInfo = prevKeyTable.get(prevDbKey);
if (omKeyInfo != null &&
info.getObjectID() == omKeyInfo.getObjectID()) {
// TODO: [SNAPSHOT] For now, we are not cleaning up a key in
// active DB's deletedTable if any one of the keys in
// RepeatedOmKeyInfo exists in last snapshot's key/fileTable.
// Might need to refactor OMKeyDeleteRequest first to take
// actual reclaimed key objectIDs as input
// in order to avoid any race condition.
blockGroupList.clear();
break;
}
}

// Add all blocks from all versions of the key to the deletion list
for (OmKeyLocationInfoGroup keyLocations :
info.getKeyLocationVersions()) {
Expand All @@ -1432,16 +1482,40 @@ public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
.setKeyName(kv.getKey())
.addAllBlockIDs(item)
.build();
keyBlocksList.add(keyBlocks);
blockGroupList.add(keyBlocks);
}
currentCount++;
}
keyBlocksList.addAll(blockGroupList);
}
}
}
return keyBlocksList;
}

/**
* Get the latest OmSnapshot for a snapshot path.
*/
private OmSnapshot getLatestSnapshot(String volumeName, String bucketName,
OmSnapshotManager snapshotManager)
throws IOException {

String latestPathSnapshot =
snapshotChainManager.getLatestPathSnapshot(volumeName
+ OM_KEY_PREFIX + bucketName);
String snapTableKey = latestPathSnapshot != null ?
snapshotChainManager.getTableKey(latestPathSnapshot) : null;
SnapshotInfo snapInfo = snapTableKey != null ?
getSnapshotInfoTable().get(snapTableKey) : null;

OmSnapshot omSnapshot = null;
if (snapInfo != null) {
omSnapshot = (OmSnapshot) snapshotManager.checkForSnapshot(volumeName,
bucketName, getSnapshotPrefix(snapInfo.getName()));
}
return omSnapshot;
}

@Override
public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
Expand Down Expand Up @@ -1786,4 +1860,30 @@ public long getBucketId(String volume, String bucket) throws IOException {
}
return omBucketInfo.getObjectID();
}

@Override
public List<BlockGroup> getBlocksForKeyDelete(String deletedKey)
throws IOException {
RepeatedOmKeyInfo omKeyInfo = getDeletedTable().get(deletedKey);
if (omKeyInfo == null) {
return null;
}

List<BlockGroup> result = new ArrayList<>();
// Add all blocks from all versions of the key to the deletion list
for (OmKeyInfo info : omKeyInfo.cloneOmKeyInfoList()) {
for (OmKeyLocationInfoGroup keyLocations :
info.getKeyLocationVersions()) {
List<BlockID> item = keyLocations.getLocationList().stream()
.map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
.collect(Collectors.toList());
BlockGroup keyBlocks = BlockGroup.newBuilder()
.setKeyName(deletedKey)
.addAllBlockIDs(item)
.build();
result.add(keyBlocks);
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class OmSnapshot implements IOmMetadataReader, Closeable {
private final String bucketName;
private final String snapshotName;
private final OMMetadataManager omMetadataManager;
private final KeyManager keyManager;

public OmSnapshot(KeyManager keyManager,
PrefixManager prefixManager,
Expand All @@ -81,6 +82,7 @@ public OmSnapshot(KeyManager keyManager,
this.snapshotName = snapshotName;
this.bucketName = bucketName;
this.volumeName = volumeName;
this.keyManager = keyManager;
this.omMetadataManager = keyManager.getMetadataManager();
}

Expand Down Expand Up @@ -135,7 +137,7 @@ public List<OmKeyInfo> listKeys(String vname, String bname,

@Override
public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
// TODO: handle denormalization
// TODO: [SNAPSHOT] handle denormalization
return omMetadataReader.getAcl(normalizeOzoneObj(obj));
}

Expand Down Expand Up @@ -262,4 +264,8 @@ public void close() throws IOException {
public OMMetadataManager getMetadataManager() {
return omMetadataManager;
}

public KeyManager getKeyManager() {
return keyManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public final class OmSnapshotManager implements AutoCloseable {
// TODO: [SNAPSHOT] create config for max allowed page size.
private final int maxPageSize = 1000;

OmSnapshotManager(OzoneManager ozoneManager) {
public OmSnapshotManager(OzoneManager ozoneManager) {
this.options = new ManagedDBOptions();
this.options.setCreateIfMissing(true);
this.columnFamilyOptions = new ManagedColumnFamilyOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

package org.apache.hadoop.ozone.om.request.key;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
Expand All @@ -33,6 +38,8 @@

import java.util.List;

import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;

/**
* Handles purging of keys from OM DB.
*/
Expand All @@ -51,6 +58,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest();
List<DeletedKeys> bucketDeletedKeysList = purgeKeysRequest
.getDeletedKeysList();
OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ?
purgeKeysRequest.getSnapshotTableKey() : null;
List<String> keysToBePurgedList = new ArrayList<>();

OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
Expand All @@ -64,10 +74,27 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
}
}

omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
keysToBePurgedList);
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
omDoubleBufferHelper);
try {
OmSnapshot omFromSnapshot = null;
if (fromSnapshot != null) {
SnapshotInfo snapshotInfo =
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.get(fromSnapshot);
omFromSnapshot = (OmSnapshot) omSnapshotManager
.checkForSnapshot(snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(),
getSnapshotPrefix(snapshotInfo.getName()));
}

omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
keysToBePurgedList, omFromSnapshot);
} catch (IOException ex) {
omClientResponse = new OMKeyPurgeResponse(
createErrorOMResponse(omResponse, ex));
} finally {
addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
omDoubleBufferHelper);
}

return omClientResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.hadoop.ozone.om.response.key;

import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
Expand All @@ -36,19 +38,44 @@
@CleanupTableInfo(cleanupTables = {DELETED_TABLE})
public class OMKeyPurgeResponse extends OmKeyResponse {
private List<String> purgeKeyList;
private OmSnapshot fromSnapshot;

public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
@Nonnull List<String> keyList) {
@Nonnull List<String> keyList, OmSnapshot fromSnapshot) {
super(omResponse);
this.purgeKeyList = keyList;
this.fromSnapshot = fromSnapshot;
}

/**
* For when the request is not successful.
* For a successful request, the other constructor should be used.
*/
public OMKeyPurgeResponse(@Nonnull OMResponse omResponse) {
super(omResponse);
checkStatusNotOK();
}

@Override
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {

if (fromSnapshot != null) {
DBStore fromSnapshotStore = fromSnapshot.getMetadataManager().getStore();
// Init Batch Operation for snapshot db.
try (BatchOperation writeBatch = fromSnapshotStore.initBatchOperation()) {
processKeys(writeBatch, fromSnapshot.getMetadataManager());
fromSnapshotStore.commitBatchOperation(writeBatch);
}
} else {
processKeys(batchOperation, omMetadataManager);
}
}

private void processKeys(BatchOperation batchOp,
OMMetadataManager metadataManager) throws IOException {
for (String key : purgeKeyList) {
omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation,
metadataManager.getDeletedTable().deleteWithBatch(batchOp,
key);
}
}
Expand Down
Loading