diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index 43213a98490f..12aab5a1325a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -26,6 +26,8 @@ import java.util.Objects; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; @@ -34,6 +36,8 @@ import org.apache.hadoop.util.Time; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Args for key block. The block instance for the key requested in putKey. @@ -41,6 +45,7 @@ * datanode. Also, this is the metadata written to om.db on server side. */ public final class OmKeyInfo extends WithObjectID { + private static final Logger LOG = LoggerFactory.getLogger(OmKeyInfo.class); private final String volumeName; private final String bucketName; // name of key client specified @@ -147,10 +152,35 @@ public void updateModifcationTime() { */ public void updateLocationInfoList(List locationInfoList, boolean isMpu) { + updateLocationInfoList(locationInfoList, isMpu, false); + } + + /** + * updates the length of the each block in the list given. + * This will be called when the key is being committed to OzoneManager. + * + * @param locationInfoList list of locationInfo + * @param isMpu a true represents multi part key, false otherwise + * @param skipBlockIDCheck a true represents that the blockId verification + * check should be skipped, false represents that + * the blockId verification will be required + */ + public void updateLocationInfoList(List locationInfoList, + boolean isMpu, boolean skipBlockIDCheck) { long latestVersion = getLatestVersionLocations().getVersion(); OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations(); keyLocationInfoGroup.setMultipartKey(isMpu); + + // Compare user given block location against allocatedBlockLocations + // present in OmKeyInfo. + List updatedBlockLocations; + if (skipBlockIDCheck) { + updatedBlockLocations = locationInfoList; + } else { + updatedBlockLocations = + verifyAndGetKeyLocations(locationInfoList, keyLocationInfoGroup); + } // Updates the latest locationList in the latest version only with // given locationInfoList here. // TODO : The original allocated list and the updated list here may vary @@ -159,12 +189,37 @@ public void updateLocationInfoList(List locationInfoList, // need to be garbage collected in case the ozone client dies. keyLocationInfoGroup.removeBlocks(latestVersion); // set each of the locationInfo object to the latest version - locationInfoList.forEach(omKeyLocationInfo -> omKeyLocationInfo + updatedBlockLocations.forEach(omKeyLocationInfo -> omKeyLocationInfo .setCreateVersion(latestVersion)); - keyLocationInfoGroup.addAll(latestVersion, locationInfoList); + keyLocationInfoGroup.addAll(latestVersion, updatedBlockLocations); } + private List verifyAndGetKeyLocations( + List locationInfoList, + OmKeyLocationInfoGroup keyLocationInfoGroup) { + List allocatedBlockLocations = + keyLocationInfoGroup.getBlocksLatestVersionOnly(); + List updatedBlockLocations = new ArrayList<>(); + + List existingBlockIDs = new ArrayList<>(); + for (OmKeyLocationInfo existingLocationInfo : allocatedBlockLocations) { + BlockID existingBlockID = existingLocationInfo.getBlockID(); + existingBlockIDs.add(existingBlockID.getContainerBlockID()); + } + + for (OmKeyLocationInfo modifiedLocationInfo : locationInfoList) { + BlockID modifiedBlockID = modifiedLocationInfo.getBlockID(); + if (existingBlockIDs.contains(modifiedBlockID.getContainerBlockID())) { + updatedBlockLocations.add(modifiedLocationInfo); + } else { + LOG.warn("Unknown BlockLocation:{}, where the blockID of given " + + "location doesn't match with the stored/allocated block of" + + " keyName:{}", modifiedLocationInfo, keyName); + } + } + return updatedBlockLocations; + } /** * Append a set of blocks to the latest version. Note that these blocks are diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index 96a0f7c137e4..c73057979b96 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -760,9 +760,13 @@ public void testLookupKeyWithLocation() throws IOException { Pipeline pipeline = scm.getPipelineManager().createPipeline( ReplicationType.RATIS, ReplicationFactor.THREE, nodeList); List locationInfoList = new ArrayList<>(); + List locationList = + keySession.getKeyInfo().getLatestVersionLocations().getLocationList(); + Assert.assertEquals(1, locationList.size()); locationInfoList.add( new OmKeyLocationInfo.Builder().setPipeline(pipeline) - .setBlockID(new BlockID(1L, 1L)).build()); + .setBlockID(new BlockID(locationList.get(0).getContainerID(), + locationList.get(0).getLocalID())).build()); keyArgs.setLocationInfoList(locationInfoList); keyManager.commitKey(keyArgs, keySession.getId()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index f1336fc1171f..0b646c2f7c05 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -285,7 +285,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // But now as versioning is not supported, just following the commit // key approach. When versioning support comes, then we can uncomment // below code keyInfo.addNewVersion(locations); - omKeyInfo.updateLocationInfoList(partLocationInfos, true); + omKeyInfo.updateLocationInfoList(partLocationInfos, true, true); omKeyInfo.setModificationTime(keyArgs.getModificationTime()); omKeyInfo.setDataSize(dataSize); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java index ff1f9c3af196..0a305eb66e75 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java @@ -104,6 +104,33 @@ public static void addKeyToTableAndCache(String volumeName, String bucketName, replicationType, replicationFactor, trxnLogIndex, omMetadataManager); } + /** + * Add key entry to KeyTable. if openKeyTable flag is true, add's entries + * to openKeyTable, else add's it to keyTable. + * @param openKeyTable + * @param volumeName + * @param bucketName + * @param keyName + * @param clientID + * @param replicationType + * @param replicationFactor + * @param omMetadataManager + * @param locationList + * @throws Exception + */ + @SuppressWarnings("parameterNumber") + public static void addKeyToTable(boolean openKeyTable, String volumeName, + String bucketName, String keyName, long clientID, + HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor replicationFactor, + OMMetadataManager omMetadataManager, + List locationList) throws Exception { + addKeyToTable(openKeyTable, false, volumeName, bucketName, keyName, + clientID, replicationType, replicationFactor, 0L, omMetadataManager, + locationList); + } + + /** * Add key entry to KeyTable. if openKeyTable flag is true, add's entries * to openKeyTable, else add's it to keyTable. @@ -137,14 +164,34 @@ public static void addKeyToTable(boolean openKeyTable, boolean addToCache, String volumeName, String bucketName, String keyName, long clientID, HddsProtos.ReplicationType replicationType, HddsProtos.ReplicationFactor replicationFactor, long trxnLogIndex, - OMMetadataManager omMetadataManager) throws Exception { + OMMetadataManager omMetadataManager, + List locationList) throws Exception { OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName, replicationType, replicationFactor, trxnLogIndex); + omKeyInfo.appendNewBlocks(locationList, false); addKeyToTable(openKeyTable, addToCache, omKeyInfo, clientID, trxnLogIndex, omMetadataManager); + } + + /** + * Add key entry to KeyTable. if openKeyTable flag is true, add's entries + * to openKeyTable, else add's it to keyTable. + * @throws Exception + */ + @SuppressWarnings("parameternumber") + public static void addKeyToTable(boolean openKeyTable, boolean addToCache, + String volumeName, String bucketName, String keyName, long clientID, + HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor replicationFactor, long trxnLogIndex, + OMMetadataManager omMetadataManager) throws Exception { + OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName, + replicationType, replicationFactor, trxnLogIndex); + + addKeyToTable(openKeyTable, addToCache, omKeyInfo, clientID, trxnLogIndex, + omMetadataManager); } /** diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java index b327b76e5136..f86442660a53 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyCommitRequest.java @@ -53,6 +53,64 @@ public void testPreExecute() throws Exception { doPreExecute(createCommitKeyRequest()); } + @Test + public void testValidateAndUpdateCacheWithUnknownBlockId() throws Exception { + + OMRequest modifiedOmRequest = + doPreExecute(createCommitKeyRequest()); + + OMKeyCommitRequest omKeyCommitRequest = + new OMKeyCommitRequest(modifiedOmRequest); + + // Append 3 blocks locations. + List allocatedLocationList = getKeyLocation(3) + .stream().map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, + omMetadataManager); + + TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName, + clientID, replicationType, replicationFactor, omMetadataManager, + allocatedLocationList); + + String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, + keyName); + + // Key should not be there in key table, as validateAndUpdateCache is + // still not called. + OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey); + + Assert.assertNull(omKeyInfo); + + OMClientResponse omClientResponse = + omKeyCommitRequest.validateAndUpdateCache(ozoneManager, + 100L, ozoneManagerDoubleBufferHelper); + + Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse.getOMResponse().getStatus()); + + // Entry should be deleted from openKey Table. + omKeyInfo = omMetadataManager.getOpenKeyTable().get(ozoneKey); + Assert.assertNull(omKeyInfo); + + // Now entry should be created in key Table. + omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey); + + Assert.assertNotNull(omKeyInfo); + + // Check modification time + + CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest(); + Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(), + omKeyInfo.getModificationTime()); + + // Check block location. + Assert.assertEquals(allocatedLocationList, + omKeyInfo.getLatestVersionLocations().getLocationList()); + + } + @Test public void testValidateAndUpdateCache() throws Exception { @@ -62,11 +120,21 @@ public void testValidateAndUpdateCache() throws Exception { OMKeyCommitRequest omKeyCommitRequest = new OMKeyCommitRequest(modifiedOmRequest); + + KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs(); + + // Append new blocks + List allocatedLocationList = + keyArgs.getKeyLocationsList().stream() + .map(OmKeyLocationInfo::getFromProtobuf) + .collect(Collectors.toList()); + TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName, omMetadataManager); TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName, - clientID, replicationType, replicationFactor, omMetadataManager); + clientID, replicationType, replicationFactor, omMetadataManager, + allocatedLocationList); String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName, keyName); @@ -107,6 +175,8 @@ public void testValidateAndUpdateCache() throws Exception { Assert.assertEquals(locationInfoListFromCommitKeyRequest, omKeyInfo.getLatestVersionLocations().getLocationList()); + Assert.assertEquals(allocatedLocationList, + omKeyInfo.getLatestVersionLocations().getLocationList()); } @@ -264,7 +334,7 @@ private OMRequest createCommitKeyRequest() { KeyArgs.newBuilder().setDataSize(dataSize).setVolumeName(volumeName) .setKeyName(keyName).setBucketName(bucketName) .setType(replicationType).setFactor(replicationFactor) - .addAllKeyLocations(getKeyLocation()).build(); + .addAllKeyLocations(getKeyLocation(5)).build(); CommitKeyRequest commitKeyRequest = CommitKeyRequest.newBuilder().setKeyArgs(keyArgs) @@ -279,10 +349,10 @@ private OMRequest createCommitKeyRequest() { /** * Create KeyLocation list. */ - private List getKeyLocation() { + private List getKeyLocation(int count) { List keyLocations = new ArrayList<>(); - for (int i=0; i < 5; i++) { + for (int i=0; i < count; i++) { KeyLocation keyLocation = KeyLocation.newBuilder() .setBlockID(HddsProtos.BlockID.newBuilder()