diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 9b2014dd7b84..f8f589af2efc 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -26,9 +26,9 @@ import java.util.Objects; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; -import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -180,25 +180,30 @@ public void updateModifcationTime() { /** * updates the length of the each block in the list given. * This will be called when the key is being committed to OzoneManager. + * Return the uncommitted locationInfo to be deleted. * * @param locationInfoList list of locationInfo + * @return allocated but uncommitted locationInfos */ - public void updateLocationInfoList(List locationInfoList, - boolean isMpu) { - updateLocationInfoList(locationInfoList, isMpu, false); + public List updateLocationInfoList( + List locationInfoList, boolean isMpu) { + return updateLocationInfoList(locationInfoList, isMpu, false); } /** * updates the length of the each block in the list given. * This will be called when the key is being committed to OzoneManager. + * Return the uncommitted locationInfo to be deleted. * * @param locationInfoList list of locationInfo * @param isMpu a true represents multi part key, false otherwise * @param skipBlockIDCheck a true represents that the blockId verification * check should be skipped, false represents that * the blockId verification will be required + * @return allocated but uncommitted locationInfos */ - public void updateLocationInfoList(List locationInfoList, + public List updateLocationInfoList( + List locationInfoList, boolean isMpu, boolean skipBlockIDCheck) { long latestVersion = getLatestVersionLocations().getVersion(); OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); @@ -207,51 +212,68 @@ public void updateLocationInfoList(List locationInfoList, // Compare user given block location against allocatedBlockLocations // present in OmKeyInfo. + List uncommittedBlocks; List updatedBlockLocations; if (skipBlockIDCheck) { updatedBlockLocations = locationInfoList; + uncommittedBlocks = new ArrayList<>(); } else { - updatedBlockLocations = + Pair, List> verifiedResult = verifyAndGetKeyLocations(locationInfoList, keyLocationInfoGroup); + updatedBlockLocations = verifiedResult.getLeft(); + uncommittedBlocks = verifiedResult.getRight(); } - // Updates the latest locationList in the latest version only with - // given locationInfoList here. - // TODO : The original allocated list and the updated list here may vary - // as the containers on the Datanode on which the blocks were pre allocated - // might get closed. The diff of blocks between these two lists here - // need to be garbage collected in case the ozone client dies. + keyLocationInfoGroup.removeBlocks(latestVersion); // set each of the locationInfo object to the latest version updatedBlockLocations.forEach(omKeyLocationInfo -> omKeyLocationInfo .setCreateVersion(latestVersion)); keyLocationInfoGroup.addAll(latestVersion, updatedBlockLocations); - } - private List verifyAndGetKeyLocations( - List locationInfoList, - OmKeyLocationInfoGroup keyLocationInfoGroup) { - - List allocatedBlockLocations = - keyLocationInfoGroup.getBlocksLatestVersionOnly(); - List updatedBlockLocations = new ArrayList<>(); + return uncommittedBlocks; + } - List existingBlockIDs = new ArrayList<>(); - for (OmKeyLocationInfo existingLocationInfo : allocatedBlockLocations) { - BlockID existingBlockID = existingLocationInfo.getBlockID(); - existingBlockIDs.add(existingBlockID.getContainerBlockID()); + /** + * 1. Verify committed KeyLocationInfos + * 2. Find out the allocated but uncommitted KeyLocationInfos. + * + * @param locationInfoList committed KeyLocationInfos + * @param keyLocationInfoGroup allocated KeyLocationInfoGroup + * @return Pair of updatedOmKeyLocationInfo and uncommittedOmKeyLocationInfo + */ + private Pair, List> + verifyAndGetKeyLocations( + List locationInfoList, + OmKeyLocationInfoGroup keyLocationInfoGroup) { + // Only check ContainerBlockID here to avoid the mismatch of the pipeline + // field and BcsId in the OmKeyLocationInfo, as the OmKeyInfoCodec ignores + // the pipeline field by default and bcsId would be updated in Ratis mode. + Map allocatedBlockLocations = + new HashMap<>(); + for (OmKeyLocationInfo existingLocationInfo : keyLocationInfoGroup. + getLocationList()) { + ContainerBlockID existingBlockID = existingLocationInfo.getBlockID(). + getContainerBlockID(); + // The case of overwriting value should never happen + allocatedBlockLocations.put(existingBlockID, existingLocationInfo); } + List updatedBlockLocations = new ArrayList<>(); for (OmKeyLocationInfo modifiedLocationInfo : locationInfoList) { - BlockID modifiedBlockID = modifiedLocationInfo.getBlockID(); - if (existingBlockIDs.contains(modifiedBlockID.getContainerBlockID())) { + ContainerBlockID modifiedContainerBlockId = + modifiedLocationInfo.getBlockID().getContainerBlockID(); + if (allocatedBlockLocations.containsKey(modifiedContainerBlockId)) { updatedBlockLocations.add(modifiedLocationInfo); + allocatedBlockLocations.remove(modifiedContainerBlockId); } else { LOG.warn("Unknown BlockLocation:{}, where the blockID of given " + "location doesn't match with the stored/allocated block of" + " keyName:{}", modifiedLocationInfo, keyName); } } - return updatedBlockLocations; + List uncommittedLocationInfos = new ArrayList<>( + allocatedBlockLocations.values()); + return Pair.of(updatedBlockLocations, uncommittedLocationInfos); } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java index ebff5416f364..bb070b6ba11b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java @@ -218,8 +218,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName); omKeyInfo.setDataSize(commitKeyArgs.getDataSize()); omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime()); - // Update the block length for each block - omKeyInfo.updateLocationInfoList(locationInfoList, false); + // Update the block length for each block, return the allocated but + // uncommitted blocks + List uncommitted = omKeyInfo.updateLocationInfoList( + locationInfoList, false); + // Set the UpdateID to current transactionLogIndex omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); @@ -239,6 +242,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, omBucketInfo.incrUsedNamespace(1L); } + // let the uncommitted blocks pretend as key's old version blocks + // which will be deleted as RepeatedOmKeyInfo + OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted, + omKeyInfo); + if (pseudoKeyInfo != null) { + if (oldKeyVersionsToDelete != null) { + oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo); + } else { + oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo); + } + } + // Add to cache of open key table and key table. omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry( new CacheKey<>(dbOpenKey), diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java index b9239c9d8648..5c7ac450b8f3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java @@ -41,6 +41,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; @@ -58,6 +60,9 @@ */ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest { + private static final Logger LOG = + LoggerFactory.getLogger(OMKeyCommitRequestWithFSO.class); + public OMKeyCommitRequestWithFSO(OMRequest omRequest, BucketLayout bucketLayout) { super(omRequest, bucketLayout); @@ -144,10 +149,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime()); - // Update the block length for each block - List allocatedLocationInfoList = - omKeyInfo.getLatestVersionLocations().getLocationList(); - omKeyInfo.updateLocationInfoList(locationInfoList, false); + List uncommitted = omKeyInfo.updateLocationInfoList( + locationInfoList, false); // Set the UpdateID to current transactionLogIndex omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled()); @@ -177,6 +180,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, omBucketInfo.incrUsedNamespace(1L); } + // let the uncommitted blocks pretend as key's old version blocks + // which will be deleted as RepeatedOmKeyInfo + OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted, + omKeyInfo); + if (pseudoKeyInfo != null) { + if (oldKeyVersionsToDelete != null) { + oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo); + } else { + oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo); + } + } + // Add to cache of open key table and key table. OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, dbFileKey, null, fileName, trxnLogIndex); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java index 904b5c000592..8c79e16dcd94 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java @@ -816,4 +816,28 @@ protected OzoneLockStrategy getOzoneLockStrategy(OzoneManager ozoneManager) { return ozoneManager.getOzoneLockProvider() .createLockStrategy(getBucketLayout()); } + + /** + * Wrap the uncommitted blocks as pseudoKeyInfo. + * + * @param uncommitted Uncommitted OmKeyLocationInfo + * @param omKeyInfo Args for key block + * @return pseudoKeyInfo + */ + protected OmKeyInfo wrapUncommittedBlocksAsPseudoKey( + List uncommitted, OmKeyInfo omKeyInfo) { + if (uncommitted.isEmpty()) { + return null; + } + LOG.info("Detect allocated but uncommitted blocks {} in key {}.", + uncommitted, omKeyInfo.getKeyName()); + OmKeyInfo pseudoKeyInfo = omKeyInfo.copyObject(); + // TODO dataSize of pseudoKey is not real here + List uncommittedGroups = new ArrayList<>(); + // version not matters in the current logic of keyDeletingService, + // all versions of blocks will be deleted. + uncommittedGroups.add(new OmKeyLocationInfoGroup(0, uncommitted)); + pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups); + return pseudoKeyInfo; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java index 513e10cba46a..911a61cce63c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.response.key; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -104,6 +105,11 @@ protected String getOzoneKeyName() { return ozoneKeyName; } + @VisibleForTesting + public RepeatedOmKeyInfo getKeysToDelete() { + return keysToDelete; + } + protected void updateDeletedTable(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { if (this.keysToDelete != null) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java index bdf16fef42a4..10552e380d18 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse; import org.apache.hadoop.util.Time; import org.jetbrains.annotations.NotNull; import org.junit.Assert; @@ -190,6 +191,93 @@ public void testValidateAndUpdateCache() throws Exception { omKeyInfo.getLatestVersionLocations().getLocationList()); } + @Test + public void testValidateAndUpdateCacheWithUncommittedBlocks() + throws Exception { + + // allocated block list + List allocatedKeyLocationList = getKeyLocation(5); + + List allocatedBlockList = allocatedKeyLocationList + .stream().map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + + // committed block list, with three blocks different with the allocated + List committedKeyLocationList = getKeyLocation(3); + + OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest( + committedKeyLocationList)); + + OMKeyCommitRequest omKeyCommitRequest = + getOmKeyCommitRequest(modifiedOmRequest); + + OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager, omKeyCommitRequest.getBucketLayout()); + + String ozoneKey = addKeyToOpenKeyTable(allocatedBlockList); + + // Key should not be there in key table, as validateAndUpdateCache is + // still not called. + OmKeyInfo omKeyInfo = + omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout()) + .get(ozoneKey); + + Assert.assertNull(omKeyInfo); + + OMClientResponse omClientResponse = + omKeyCommitRequest.validateAndUpdateCache(ozoneManager, + 100L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + + List toDeleteKeyList = ((OMKeyCommitResponse) omClientResponse). + getKeysToDelete().cloneOmKeyInfoList(); + + // This is the first time to commit key, only the allocated but uncommitted + // blocks should be deleted. + Assert.assertEquals(1, toDeleteKeyList.size()); + Assert.assertEquals(2, toDeleteKeyList.get(0). + getKeyLocationVersions().get(0).getLocationList().size()); + + // Entry should be deleted from openKey Table. + omKeyInfo = + omMetadataManager.getOpenKeyTable(omKeyCommitRequest.getBucketLayout()) + .get(ozoneKey); + Assert.assertNull(omKeyInfo); + + // Now entry should be created in key Table. + omKeyInfo = + omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout()) + .get(ozoneKey); + + Assert.assertNotNull(omKeyInfo); + + // DB keyInfo format + verifyKeyName(omKeyInfo); + + // Check modification time + CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest(); + Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(), + omKeyInfo.getModificationTime()); + + // Check block location. + List locationInfoListFromCommitKeyRequest = + commitKeyRequest.getKeyArgs() + .getKeyLocationsList().stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + + List intersection = new ArrayList<>(allocatedBlockList); + intersection.retainAll(locationInfoListFromCommitKeyRequest); + + // Key table should have three blocks. + Assert.assertEquals(intersection, + omKeyInfo.getLatestVersionLocations().getLocationList()); + Assert.assertEquals(3, intersection.size()); + + } + @Test public void testValidateAndUpdateCacheWithSubDirs() throws Exception { parentDir = "dir1/dir2/dir3/"; @@ -466,15 +554,19 @@ private void verifyKeyArgs(KeyArgs originalKeyArgs, KeyArgs modifiedKeyArgs) { modifiedKeyArgs.getFactor()); } + private OMRequest createCommitKeyRequest() { + return createCommitKeyRequest(getKeyLocation(5)); + } + /** * Create OMRequest which encapsulates CommitKeyRequest. */ - private OMRequest createCommitKeyRequest() { + private OMRequest createCommitKeyRequest(List keyLocations) { KeyArgs keyArgs = KeyArgs.newBuilder().setDataSize(dataSize).setVolumeName(volumeName) .setKeyName(keyName).setBucketName(bucketName) .setType(replicationType).setFactor(replicationFactor) - .addAllKeyLocations(getKeyLocation(5)).build(); + .addAllKeyLocations(keyLocations).build(); CommitKeyRequest commitKeyRequest = CommitKeyRequest.newBuilder().setKeyArgs(keyArgs) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index a24a72ac80ba..3bc35c8b3b82 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -22,14 +22,21 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OmTestManagers; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; import org.apache.ratis.util.ExitUtils; import org.junit.BeforeClass; @@ -133,8 +140,8 @@ public void checkIfDeleteServiceIsDeletingKeys() () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount, 1000, 10000); Assert.assertTrue(keyDeletingService.getRunCount().get() > 1); - Assert.assertEquals( - keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), 0); + Assert.assertEquals(0, + keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size()); } @Test(timeout = 40000) @@ -176,9 +183,9 @@ public void checkIfDeleteServiceWithFailingSCM() () -> keyDeletingService.getRunCount().get() >= 5, 100, 1000); // Since SCM calls are failing, deletedKeyCount should be zero. - Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0); - Assert.assertEquals( - keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), keyCount); + Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get()); + Assert.assertEquals(keyCount, + keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size()); } @Test(timeout = 30000) @@ -200,15 +207,86 @@ public void checkDeletionForEmptyKey() KeyDeletingService keyDeletingService = (KeyDeletingService) keyManager.getDeletingService(); - // Since empty keys are directly deleted from db there should be no - // pending deletion keys. Also deletedKeyCount should be zero. - Assert.assertEquals( - keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size(), 0); + // the pre-allocated blocks are not committed, hence they will be deleted. + Assert.assertEquals(100, + keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size()); // Make sure that we have run the background thread 2 times or more GenericTestUtils.waitFor( () -> keyDeletingService.getRunCount().get() >= 2, 100, 1000); - Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0); + // the blockClient is set to fail the deletion of key blocks, hence no keys + // will be deleted + Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get()); + } + + @Test(timeout = 30000) + public void checkDeletionForPartiallyCommitKey() + throws IOException, TimeoutException, InterruptedException, + AuthenticationException { + OzoneConfiguration conf = createConfAndInitValues(); + ScmBlockLocationProtocol blockClient = + //failCallsFrequency = 1 , means all calls fail. + new ScmBlockLocationTestingClient(null, null, 1); + OmTestManagers omTestManagers + = new OmTestManagers(conf, blockClient, null); + KeyManager keyManager = omTestManagers.getKeyManager(); + writeClient = omTestManagers.getWriteClient(); + om = omTestManagers.getOzoneManager(); + + String volumeName = String.format("volume%s", + RandomStringUtils.randomAlphanumeric(5)); + String bucketName = String.format("bucket%s", + RandomStringUtils.randomAlphanumeric(5)); + String keyName = String.format("key%s", + RandomStringUtils.randomAlphanumeric(5)); + + // Create Volume and Bucket + createVolumeAndBucket(keyManager, volumeName, bucketName, false); + + OmKeyArgs keyArg = createAndCommitKey(keyManager, volumeName, bucketName, + keyName, 3, 1); + + // Only the uncommitted block should be pending to be deleted. + GenericTestUtils.waitFor( + () -> { + try { + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + .stream() + .map(BlockGroup::getBlockIDList) + .flatMap(Collection::stream) + .collect(Collectors.toList()).size() == 1; + } catch (IOException e) { + e.printStackTrace(); + } + return false; + }, + 500, 3000); + + // Delete the key + writeClient.deleteKey(keyArg); + + KeyDeletingService keyDeletingService = + (KeyDeletingService) keyManager.getDeletingService(); + + // All blocks should be pending to be deleted. + GenericTestUtils.waitFor( + () -> { + try { + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + .stream() + .map(BlockGroup::getBlockIDList) + .flatMap(Collection::stream) + .collect(Collectors.toList()).size() == 3; + } catch (IOException e) { + e.printStackTrace(); + } + return false; + }, + 500, 3000); + + // the blockClient is set to fail the deletion of key blocks, hence no keys + // will be deleted + Assert.assertEquals(0, keyDeletingService.getDeletedKeyCount().get()); } @Test(timeout = 30000) @@ -299,6 +377,15 @@ private void createVolumeAndBucket(KeyManager keyManager, String volumeName, private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName, String bucketName, String keyName, int numBlocks) throws IOException { + return createAndCommitKey(keyManager, volumeName, bucketName, keyName, + numBlocks, 0); + } + + private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName, + String bucketName, String keyName, int numBlocks, int numUncommitted) + throws IOException { + // Even if no key size is appointed, there will be at least one + // block pre-allocated when key is created OmKeyArgs keyArg = new OmKeyArgs.Builder() .setVolumeName(volumeName) @@ -311,10 +398,35 @@ private OmKeyArgs createAndCommitKey(KeyManager keyManager, String volumeName, .build(); //Open and Commit the Key in the Key Manager. OpenKeySession session = writeClient.openKey(keyArg); - for (int i = 0; i < numBlocks; i++) { - keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(), + + // add pre-allocated blocks into args and avoid creating excessive block + OmKeyLocationInfoGroup keyLocationVersions = session.getKeyInfo(). + getLatestVersionLocations(); + assert keyLocationVersions != null; + List latestBlocks = keyLocationVersions. + getBlocksLatestVersionOnly(); + int preAllocatedSize = latestBlocks.size(); + for (OmKeyLocationInfo block : latestBlocks) { + keyArg.addLocationInfo(block); + } + + // allocate blocks until the blocks num equal to numBlocks + LinkedList allocated = new LinkedList<>(); + for (int i = 0; i < numBlocks - preAllocatedSize; i++) { + allocated.add(writeClient.allocateBlock(keyArg, session.getId(), new ExcludeList())); } + + // remove the blocks not to be committed + for (int i = 0; i < numUncommitted; i++) { + allocated.removeFirst(); + } + + // add the blocks to be committed + for (OmKeyLocationInfo block: allocated) { + keyArg.addLocationInfo(block); + } + writeClient.commitKey(keyArg, session.getId()); return keyArg; }