Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1380,8 +1380,8 @@ message DeleteKeyResponse {
}

message DeletedKeys {
required string volumeName = 1;
required string bucketName = 2;
required string volumeName = 1 [deprecated = true];
required string bucketName = 2 [deprecated = true];
repeated string keys = 3;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -63,7 +64,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.util.Preconditions;

/**
* Abstracts common code from KeyDeletingService and DirectoryDeletingService
Expand Down Expand Up @@ -102,8 +102,7 @@ public AbstractKeyDeletingService(String serviceName, long interval,
}

protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
KeyManager manager,
HashMap<String, RepeatedOmKeyInfo> keysToModify,
Map<String, RepeatedOmKeyInfo> keysToModify,
String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException {

long startTime = Time.monotonicNow();
Expand Down Expand Up @@ -143,30 +142,33 @@ protected int processKeyDeletes(List<BlockGroup> keyBlocksList,
* @param keysToModify Updated list of RepeatedOmKeyInfo
*/
private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
HashMap<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, UUID expectedPreviousSnapshotId) {
Map<Pair<String, String>, List<String>> purgeKeysMapPerBucket =
new HashMap<>();
Map<String, RepeatedOmKeyInfo> keysToModify, String snapTableKey, UUID expectedPreviousSnapshotId) {
List<String> purgeKeys = new ArrayList<>();

// Put all keys to be purged in a list
int deletedCount = 0;
Set<String> failedDeletedKeys = new HashSet<>();
for (DeleteBlockGroupResult result : results) {
String deletedKey = result.getObjectKey();
if (result.isSuccess()) {
// Add key to PurgeKeys list.
String deletedKey = result.getObjectKey();
if (keysToModify != null && !keysToModify.containsKey(deletedKey)) {
// Parse Volume and BucketName
addToMap(purgeKeysMapPerBucket, deletedKey);
purgeKeys.add(deletedKey);
if (LOG.isDebugEnabled()) {
LOG.debug("Key {} set to be updated in OM DB, Other versions " +
"of the key that are reclaimable are reclaimed.", deletedKey);
}
} else if (keysToModify == null) {
addToMap(purgeKeysMapPerBucket, deletedKey);
purgeKeys.add(deletedKey);
if (LOG.isDebugEnabled()) {
LOG.debug("Key {} set to be purged from OM DB", deletedKey);
}
}
deletedCount++;
} else {
// If the block deletion failed, then the deleted keys should also not be modified.
failedDeletedKeys.add(deletedKey);
}
}

Expand All @@ -180,24 +182,20 @@ private int submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId));
}
purgeKeysRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());

// Add keys to PurgeKeysRequest bucket wise.
for (Map.Entry<Pair<String, String>, List<String>> entry :
purgeKeysMapPerBucket.entrySet()) {
Pair<String, String> volumeBucketPair = entry.getKey();
DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder()
.setVolumeName(volumeBucketPair.getLeft())
.setBucketName(volumeBucketPair.getRight())
.addAllKeys(entry.getValue())
.build();
purgeKeysRequest.addDeletedKeys(deletedKeysInBucket);
}
DeletedKeys deletedKeys = DeletedKeys.newBuilder()
.setVolumeName("")
.setBucketName("")
Comment on lines +186 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't simply ignore values for these two fields because they are required.

.addAllKeys(purgeKeys)
.build();
purgeKeysRequest.addDeletedKeys(deletedKeys);

List<SnapshotMoveKeyInfos> keysToUpdateList = new ArrayList<>();
if (keysToModify != null) {
for (Map.Entry<String, RepeatedOmKeyInfo> keyToModify :
keysToModify.entrySet()) {

if (failedDeletedKeys.contains(keyToModify.getKey())) {
continue;
}
SnapshotMoveKeyInfos.Builder keyToUpdate =
SnapshotMoveKeyInfos.newBuilder();
keyToUpdate.setKey(keyToModify.getKey());
Expand Down Expand Up @@ -235,25 +233,6 @@ protected OzoneManagerProtocolProtos.OMResponse submitRequest(OMRequest omReques
return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet());
}

/**
* Parse Volume and Bucket Name from ObjectKey and add it to given map of
* keys to be purged per bucket.
*/
private void addToMap(Map<Pair<String, String>, List<String>> map, String objectKey) {
// Parse volume and bucket name
String[] split = objectKey.split(OM_KEY_PREFIX);
Preconditions.assertTrue(split.length >= 3, "Volume and/or Bucket Name " +
"missing from Key Name " + objectKey);
if (split.length == 3) {
LOG.warn("{} missing Key Name", objectKey);
}
Pair<String, String> volumeBucketPair = Pair.of(split[1], split[2]);
if (!map.containsKey(volumeBucketPair)) {
map.put(volumeBucketPair, new ArrayList<>());
}
map.get(volumeBucketPair).add(objectKey);
}

protected void submitPurgePaths(List<PurgePathRequest> requests,
String snapTableKey,
UUID expectedPreviousSnapshotId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public BackgroundTaskResult call() {
.getKeyBlocksList();
if (keyBlocksList != null && !keyBlocksList.isEmpty()) {
delCount = processKeyDeletes(keyBlocksList,
getOzoneManager().getKeyManager(),
pendingKeysDeletion.getKeysToModify(), null, expectedPreviousSnapshotId);
deletedKeyCount.addAndGet(delCount);
metrics.incrNumKeysProcessed(keyBlocksList.size());
Expand Down Expand Up @@ -447,7 +446,7 @@ private void processSnapshotDeepClean(int delCount)
}

if (!keysToPurge.isEmpty()) {
processKeyDeletes(keysToPurge, currOmSnapshot.getKeyManager(),
processKeyDeletes(keysToPurge,
keysToModify, currSnapInfo.getTableKey(),
Optional.ofNullable(previousSnapshot).map(SnapshotInfo::getSnapshotId).orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
Expand All @@ -46,7 +51,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -78,20 +85,24 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.OzoneTestBase;
import org.apache.ratis.util.ExitUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -634,6 +645,38 @@ void cleanup() {
}
}

@Test
@DisplayName("Should not update keys when purge request times out during key deletion")
public void testFailingModifiedKeyPurge() throws IOException {

try (MockedStatic<OzoneManagerRatisUtils> mocked = mockStatic(OzoneManagerRatisUtils.class,
CALLS_REAL_METHODS)) {
AtomicReference<OzoneManagerProtocolProtos.OMRequest> purgeRequest = new AtomicReference<>();
mocked.when(() -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(), anyLong()))
.thenAnswer(i -> {
purgeRequest.set(i.getArgument(1));
return OzoneManagerProtocolProtos.OMResponse.newBuilder().setCmdType(purgeRequest.get().getCmdType())
.setStatus(OzoneManagerProtocolProtos.Status.TIMEOUT).build();
});
List<BlockGroup> blockGroups = Collections.singletonList(BlockGroup.newBuilder().setKeyName("key1")
.addAllBlockIDs(Collections.singletonList(new BlockID(1, 1))).build());
OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
.setBucketName("buck")
.setVolumeName("vol")
.setKeyName("key1")
.setDataSize(10)
.setOmKeyLocationInfos(null)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setObjectID(1)
.setParentObjectID(2)
.build();
Map<String, RepeatedOmKeyInfo> keysToModify = Collections.singletonMap("key1",
new RepeatedOmKeyInfo(Collections.singletonList(omKeyInfo)));
keyDeletingService.processKeyDeletes(blockGroups, keysToModify, null, null);
assertTrue(purgeRequest.get().getPurgeKeysRequest().getKeysToUpdateList().isEmpty());
}
}

@Test
void checkIfDeleteServiceWithFailingSCM() throws Exception {
final int initialCount = countKeysPendingDeletion();
Expand Down