-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-13213. KeyDeletingService should limit task size by both key count and serialized size. #8757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -710,17 +710,18 @@ public ListKeysResult listKeys(String volumeName, String bucketName, | |
|
|
||
| @Override | ||
| public PendingKeysDeletion getPendingDeletionKeys( | ||
| final CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, final int count) | ||
| throws IOException { | ||
| return getPendingDeletionKeys(null, null, null, filter, count); | ||
| final CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, final int count, | ||
| int ratisByteLimit) throws IOException { | ||
| return getPendingDeletionKeys(null, null, null, filter, count, ratisByteLimit); | ||
| } | ||
|
|
||
| @Override | ||
| public PendingKeysDeletion getPendingDeletionKeys( | ||
| String volume, String bucket, String startKey, | ||
| CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, | ||
| int count) throws IOException { | ||
| int count, int ratisByteLimit) throws IOException { | ||
| List<BlockGroup> keyBlocksList = Lists.newArrayList(); | ||
| long serializedSize = 0; | ||
| Map<String, RepeatedOmKeyInfo> keysToModify = new HashMap<>(); | ||
| // Bucket prefix would be empty if volume is empty i.e. either null or "". | ||
| Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false); | ||
|
|
@@ -741,6 +742,7 @@ public PendingKeysDeletion getPendingDeletionKeys( | |
| List<BlockGroup> blockGroupList = Lists.newArrayList(); | ||
| // Multiple keys with the same path can be queued in one DB entry | ||
| RepeatedOmKeyInfo infoList = kv.getValue(); | ||
| boolean flag = false; | ||
aryangupta1998 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| for (OmKeyInfo info : infoList.getOmKeyInfoList()) { | ||
|
|
||
| // Skip the key if the filter doesn't allow the file to be deleted. | ||
|
|
@@ -750,12 +752,23 @@ public PendingKeysDeletion getPendingDeletionKeys( | |
| .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))).collect(Collectors.toList()); | ||
| BlockGroup keyBlocks = BlockGroup.newBuilder().setKeyName(kv.getKey()) | ||
| .addAllBlockIDs(blockIDS).build(); | ||
| serializedSize += keyBlocks.getProto().getSerializedSize(); | ||
aryangupta1998 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (serializedSize > ratisByteLimit) { | ||
| flag = true; | ||
| LOG.info( | ||
| "Total size of cumulative keys in a cycle crossed 90% ratis limit, serialized size: {}", | ||
|
||
| serializedSize); | ||
| break; | ||
| } | ||
| blockGroupList.add(keyBlocks); | ||
| currentCount++; | ||
| } else { | ||
| notReclaimableKeyInfo.addOmKeyInfo(info); | ||
| } | ||
| } | ||
| if (flag) { | ||
| break; | ||
| } | ||
|
|
||
| List<OmKeyInfo> notReclaimableKeyInfoList = notReclaimableKeyInfo.getOmKeyInfoList(); | ||
|
|
||
|
|
@@ -775,8 +788,9 @@ private <V, R> List<KeyValue<String, R>> getTableEntries(String startKey, | |
| TableIterator<String, ? extends KeyValue<String, V>> tableIterator, | ||
| Function<V, R> valueFunction, | ||
| CheckedFunction<KeyValue<String, V>, Boolean, IOException> filter, | ||
| int size) throws IOException { | ||
| int size, int ratisLimit) throws IOException { | ||
| List<KeyValue<String, R>> entries = new ArrayList<>(); | ||
| int consumedSize = 0; | ||
| /* Seek to the start key if it's not null. The next key in queue is ensured to start with the bucket | ||
| prefix, {@link org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this. | ||
| */ | ||
|
|
@@ -789,8 +803,13 @@ private <V, R> List<KeyValue<String, R>> getTableEntries(String startKey, | |
| while (tableIterator.hasNext() && currentCount < size) { | ||
| KeyValue<String, V> kv = tableIterator.next(); | ||
| if (kv != null && filter.apply(kv)) { | ||
| entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()))); | ||
| consumedSize += kv.getValueByteSize(); | ||
| entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()), kv.getValueByteSize())); | ||
| currentCount++; | ||
| if (consumedSize > ratisLimit) { | ||
| LOG.info("Serialized size exceeded the ratis limit, current serailized size : {}", consumedSize); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| return entries; | ||
|
|
@@ -811,11 +830,12 @@ private Optional<String> getBucketPrefix(String volumeName, String bucketName, b | |
| @Override | ||
| public List<KeyValue<String, String>> getRenamesKeyEntries( | ||
| String volume, String bucket, String startKey, | ||
| CheckedFunction<KeyValue<String, String>, Boolean, IOException> filter, int size) throws IOException { | ||
| CheckedFunction<KeyValue<String, String>, Boolean, IOException> filter, int size, int ratisLimit) | ||
| throws IOException { | ||
| Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false); | ||
| try (TableIterator<String, ? extends KeyValue<String, String>> | ||
| renamedKeyIter = metadataManager.getSnapshotRenamedTable().iterator(bucketPrefix.orElse(""))) { | ||
| return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size); | ||
| return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size, ratisLimit); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -861,11 +881,11 @@ private <T> CheckedFunction<KeyManager, T, IOException> getPreviousSnapshotOzone | |
| public List<KeyValue<String, List<OmKeyInfo>>> getDeletedKeyEntries( | ||
| String volume, String bucket, String startKey, | ||
| CheckedFunction<KeyValue<String, RepeatedOmKeyInfo>, Boolean, IOException> filter, | ||
| int size) throws IOException { | ||
| int size, int ratisLimit) throws IOException { | ||
| Optional<String> bucketPrefix = getBucketPrefix(volume, bucket, false); | ||
| try (TableIterator<String, ? extends KeyValue<String, RepeatedOmKeyInfo>> | ||
| delKeyIter = metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) { | ||
| return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size); | ||
| return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size, ratisLimit); | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.