Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,8 @@ message DeletedKeys {

message PurgeKeysRequest {
repeated DeletedKeys deletedKeys = 1;
optional string fromSnapshot = 2;
// if not null, will purge keys in a snapshot DB instead of active DB
optional string snapshotTableKey = 2;
}

message PurgeKeysResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public OmSnapshot(KeyManager keyManager,
this.snapshotName = snapshotName;
this.bucketName = bucketName;
this.volumeName = volumeName;
this.omMetadataManager = keyManager.getMetadataManager();
this.keyManager = keyManager;
this.omMetadataManager = keyManager.getMetadataManager();
}


Expand Down Expand Up @@ -137,7 +137,7 @@ public List<OmKeyInfo> listKeys(String vname, String bname,

@Override
public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
// TODO: handle denormalization
// TODO: [SNAPSHOT] handle denormalization
return omMetadataReader.getAcl(normalizeOzoneObj(obj));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
List<DeletedKeys> bucketDeletedKeysList = purgeKeysRequest
.getDeletedKeysList();
OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
String fromSnapshot = purgeKeysRequest.hasFromSnapshot() ?
purgeKeysRequest.getFromSnapshot() : null;
String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ?
purgeKeysRequest.getSnapshotTableKey() : null;
List<String> keysToBePurgedList = new ArrayList<>();

OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
Expand Down Expand Up @@ -88,7 +88,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
keysToBePurgedList, omFromSnapshot);
} catch (IOException ex) {
} catch (IOException ex) {
omClientResponse = new OMKeyPurgeResponse(
createErrorOMResponse(omResponse, ex));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,
}
}

public void processKeys(BatchOperation batchOp,
private void processKeys(BatchOperation batchOp,
OMMetadataManager metadataManager) throws IOException {
for (String key : purgeKeyList) {
metadataManager.getDeletedTable().deleteWithBatch(batchOp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
Expand All @@ -31,7 +30,10 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
Expand Down Expand Up @@ -65,27 +67,29 @@ public AbstractKeyDeletingService(String serviceName, long interval,
this.ozoneManager = ozoneManager;
this.scmClient = scmClient;
this.runCount = new AtomicLong(0);

}

public int processKeyDeletes(List<BlockGroup> keyBlocksList,
KeyManager manager, long startTime, String snapTableKey)
throws IOException {
protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
KeyManager manager,
String snapTableKey) throws IOException {

long startTime = Time.monotonicNow();
int delCount = 0;
List<DeleteBlockGroupResult> results =
List<DeleteBlockGroupResult> blockDeletionResults =
scmClient.deleteKeyBlocks(keyBlocksList);
if (results != null) {
if (blockDeletionResults != null) {
if (isRatisEnabled()) {
delCount = submitPurgeKeysRequest(results, snapTableKey);
delCount = submitPurgeKeysRequest(blockDeletionResults, snapTableKey);
} else {
// TODO: Once HA and non-HA paths are merged, we should have
// only one code path here. Purge keys should go through an
// OMRequest model.
delCount = deleteAllKeys(results, manager);
delCount = deleteAllKeys(blockDeletionResults, manager);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
delCount, Time.monotonicNow() - startTime);
LOG.debug("Blocks for {} (out of {}) keys are deleted in {} ms",
delCount, blockDeletionResults.size(),
Time.monotonicNow() - startTime);
}
}
return delCount;
Expand Down Expand Up @@ -128,8 +132,8 @@ private int deleteAllKeys(List<DeleteBlockGroupResult> results,
* by SCM.
* @param results DeleteBlockGroups returned by SCM.
*/
public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
String snapTableKey) {
private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
String snapTableKey) {
Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
new HashMap<>();

Expand All @@ -148,30 +152,25 @@ public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
}
}


OzoneManagerProtocolProtos.PurgeKeysRequest.Builder purgeKeysRequest =
OzoneManagerProtocolProtos.PurgeKeysRequest.newBuilder();
PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder();
if (snapTableKey != null) {
purgeKeysRequest.setSnapshotTableKey(snapTableKey);
}

// Add keys to PurgeKeysRequest bucket wise.
for (Map.Entry<Pair<String, String>, List<String>> entry :
purgeKeysMapPerBucket.entrySet()) {
Pair<String, String> volumeBucketPair = entry.getKey();
OzoneManagerProtocolProtos.DeletedKeys deletedKeysInBucket =
OzoneManagerProtocolProtos.DeletedKeys.newBuilder()
DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder()
.setVolumeName(volumeBucketPair.getLeft())
.setBucketName(volumeBucketPair.getRight())
.addAllKeys(entry.getValue())
.build();
purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
}

if (snapTableKey != null) {
purgeKeysRequest.setFromSnapshot(snapTableKey);
}

OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys)
OMRequest omRequest = OMRequest.newBuilder()
.setCmdType(Type.PurgeKeys)
.setPurgeKeysRequest(purgeKeysRequest)
.setClientId(clientId.toString())
.build();
Expand All @@ -191,7 +190,7 @@ public int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
}

private RaftClientRequest createRaftClientRequestForPurge(
OzoneManagerProtocolProtos.OMRequest omRequest) {
OMRequest omRequest) {
return RaftClientRequest.newBuilder()
.setClientId(clientId)
.setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
Expand Down Expand Up @@ -228,12 +227,6 @@ public boolean isRatisEnabled() {
return ozoneManager.isRatisEnabled();
}


@Override
public BackgroundTaskQueue getTasks() {
return null;
}

public OzoneManager getOzoneManager() {
return ozoneManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
Expand Down Expand Up @@ -65,8 +64,9 @@ public KeyDeletingService(OzoneManager ozoneManager,
ScmBlockLocationProtocol scmClient,
KeyManager manager, long serviceInterval,
long serviceTimeout, ConfigurationSource conf) {
super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
KEY_DELETING_CORE_POOL_SIZE, serviceTimeout, ozoneManager, scmClient);
super(KeyDeletingService.class.getSimpleName(), serviceInterval,
TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE,
serviceTimeout, ozoneManager, scmClient);
this.manager = manager;
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
Expand Down Expand Up @@ -119,7 +119,6 @@ public BackgroundTaskResult call() throws Exception {
if (shouldRun()) {
getRunCount().incrementAndGet();
try {
long startTime = Time.monotonicNow();
// TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in
// snapshot's deletedTable when active DB's deletedTable
// doesn't have enough entries left.
Expand All @@ -130,7 +129,7 @@ public BackgroundTaskResult call() throws Exception {
.getPendingDeletionKeys(keyLimitPerTask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a separate timer inside getPendingDeletionKeys() later. Just a thought.

if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
int delCount = processKeyDeletes(keyBlocksList,
getOzoneManager().getKeyManager(), startTime, null);
getOzoneManager().getKeyManager(), null);
deletedKeyCount.addAndGet(delCount);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
Expand All @@ -47,7 +48,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
Expand Down Expand Up @@ -231,7 +231,7 @@ public BackgroundTaskResult call() throws Exception {
.newBuilder();

for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) {
splitRepeatedOmKeyInfo(toReclaim, toNextDb,
splitRepeatedOmKeyInfo(toReclaim, toNextDb, renamedKey,
keyInfo, previousKeyTable, renamedKeyTable,
bucketInfo, volumeId);
}
Expand All @@ -251,19 +251,19 @@ public BackgroundTaskResult call() throws Exception {
keysToPurge.addAll(blocksForKeyDelete);
}
}
deletionCount++;

if (renamedKey.hasKey() && renamedKey.hasValue()) {
renamedKeysList.add(renamedKey.build());
}
deletionCount++;
}
// Submit Move request to OM.
submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList,
toNextDBList);
toNextDBList, renamedKeysList);

// Delete keys From deletedTable
long startTime = Time.monotonicNow();
processKeyDeletes(keysToPurge, omSnapshot.getKeyManager(),
startTime, snapInfo.getTableKey());
snapInfo.getTableKey());
snapshotLimit--;
successRunCount.incrementAndGet();
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private OMRequest createPurgeKeysRequest(List<String> deletedKeys,
.addDeletedKeys(deletedKeysInBucket);

if (snapshotDbKey != null) {
purgeKeysRequest.setFromSnapshot(snapshotDbKey);
purgeKeysRequest.setSnapshotTableKey(snapshotDbKey);
}
purgeKeysRequest.build();

Expand All @@ -128,7 +128,7 @@ private SnapshotInfo createSnapshot(String snapshotName) throws Exception {

// validateAndUpdateCache OMSnapshotCreateResponse.
OMSnapshotCreateResponse omClientResponse = (OMSnapshotCreateResponse)
omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 1,
omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 1L,
ozoneManagerDoubleBufferHelper);
// Add to batch and commit to DB.
omClientResponse.addToDBBatch(omMetadataManager, batchOperation);
Expand Down