Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
Expand Down Expand Up @@ -180,25 +180,30 @@ public void updateModifcationTime() {
/**
* updates the length of the each block in the list given.
* This will be called when the key is being committed to OzoneManager.
* Return the uncommitted locationInfo to be deleted.
*
* @param locationInfoList list of locationInfo
* @return allocated but uncommitted locationInfos
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
boolean isMpu) {
updateLocationInfoList(locationInfoList, isMpu, false);
public List<OmKeyLocationInfo> updateLocationInfoList(
List<OmKeyLocationInfo> locationInfoList, boolean isMpu) {
return updateLocationInfoList(locationInfoList, isMpu, false);
}

/**
* updates the length of the each block in the list given.
* This will be called when the key is being committed to OzoneManager.
* Return the uncommitted locationInfo to be deleted.
*
* @param locationInfoList list of locationInfo
* @param isMpu a true represents multi part key, false otherwise
* @param skipBlockIDCheck a true represents that the blockId verification
* check should be skipped, false represents that
* the blockId verification will be required
* @return allocated but uncommitted locationInfos
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
public List<OmKeyLocationInfo> updateLocationInfoList(
List<OmKeyLocationInfo> locationInfoList,
boolean isMpu, boolean skipBlockIDCheck) {
long latestVersion = getLatestVersionLocations().getVersion();
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
Expand All @@ -207,51 +212,68 @@ public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,

// Compare user given block location against allocatedBlockLocations
// present in OmKeyInfo.
List<OmKeyLocationInfo> uncommittedBlocks;
List<OmKeyLocationInfo> updatedBlockLocations;
if (skipBlockIDCheck) {
updatedBlockLocations = locationInfoList;
uncommittedBlocks = new ArrayList<>();
} else {
updatedBlockLocations =
Pair<List<OmKeyLocationInfo>, List<OmKeyLocationInfo>> verifiedResult =
verifyAndGetKeyLocations(locationInfoList, keyLocationInfoGroup);
updatedBlockLocations = verifiedResult.getLeft();
uncommittedBlocks = verifiedResult.getRight();
}
// Updates the latest locationList in the latest version only with
// given locationInfoList here.
// TODO : The original allocated list and the updated list here may vary
// as the containers on the Datanode on which the blocks were pre allocated
// might get closed. The diff of blocks between these two lists here
// need to be garbage collected in case the ozone client dies.

keyLocationInfoGroup.removeBlocks(latestVersion);
// set each of the locationInfo object to the latest version
updatedBlockLocations.forEach(omKeyLocationInfo -> omKeyLocationInfo
.setCreateVersion(latestVersion));
keyLocationInfoGroup.addAll(latestVersion, updatedBlockLocations);
}

private List<OmKeyLocationInfo> verifyAndGetKeyLocations(
List<OmKeyLocationInfo> locationInfoList,
OmKeyLocationInfoGroup keyLocationInfoGroup) {

List<OmKeyLocationInfo> allocatedBlockLocations =
keyLocationInfoGroup.getBlocksLatestVersionOnly();
List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();
return uncommittedBlocks;
}

List<ContainerBlockID> existingBlockIDs = new ArrayList<>();
for (OmKeyLocationInfo existingLocationInfo : allocatedBlockLocations) {
BlockID existingBlockID = existingLocationInfo.getBlockID();
existingBlockIDs.add(existingBlockID.getContainerBlockID());
/**
* 1. Verify committed KeyLocationInfos
* 2. Find out the allocated but uncommitted KeyLocationInfos.
*
* @param locationInfoList committed KeyLocationInfos
* @param keyLocationInfoGroup allocated KeyLocationInfoGroup
* @return Pair of updatedOmKeyLocationInfo and uncommittedOmKeyLocationInfo
*/
private Pair<List<OmKeyLocationInfo>, List<OmKeyLocationInfo>>
verifyAndGetKeyLocations(
List<OmKeyLocationInfo> locationInfoList,
OmKeyLocationInfoGroup keyLocationInfoGroup) {
// Only check ContainerBlockID here to avoid the mismatch of the pipeline
// field and BcsId in the OmKeyLocationInfo, as the OmKeyInfoCodec ignores
// the pipeline field by default and bcsId would be updated in Ratis mode.
Map<ContainerBlockID, OmKeyLocationInfo> allocatedBlockLocations =
new HashMap<>();
for (OmKeyLocationInfo existingLocationInfo : keyLocationInfoGroup.
getLocationList()) {
ContainerBlockID existingBlockID = existingLocationInfo.getBlockID().
getContainerBlockID();
// The case of overwriting value should never happen
allocatedBlockLocations.put(existingBlockID, existingLocationInfo);
}

List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();
for (OmKeyLocationInfo modifiedLocationInfo : locationInfoList) {
BlockID modifiedBlockID = modifiedLocationInfo.getBlockID();
if (existingBlockIDs.contains(modifiedBlockID.getContainerBlockID())) {
ContainerBlockID modifiedContainerBlockId =
modifiedLocationInfo.getBlockID().getContainerBlockID();
if (allocatedBlockLocations.containsKey(modifiedContainerBlockId)) {
updatedBlockLocations.add(modifiedLocationInfo);
allocatedBlockLocations.remove(modifiedContainerBlockId);
} else {
LOG.warn("Unknown BlockLocation:{}, where the blockID of given "
+ "location doesn't match with the stored/allocated block of"
+ " keyName:{}", modifiedLocationInfo, keyName);
}
}
return updatedBlockLocations;
List<OmKeyLocationInfo> uncommittedLocationInfos = new ArrayList<>(
allocatedBlockLocations.values());
return Pair.of(updatedBlockLocations, uncommittedLocationInfos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,11 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
// Update the block length for each block
omKeyInfo.updateLocationInfoList(locationInfoList, false);
// Update the block length for each block, return the allocated but
// uncommitted blocks
List<OmKeyLocationInfo> uncommitted = omKeyInfo.updateLocationInfoList(
locationInfoList, false);

// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());

Expand All @@ -239,6 +242,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omBucketInfo.incrUsedNamespace(1L);
}

// let the uncommitted blocks pretend as key's old version blocks
// which will be deleted as RepeatedOmKeyInfo
OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
omKeyInfo);
if (pseudoKeyInfo != null) {
if (oldKeyVersionsToDelete != null) {
oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
} else {
oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
}
}

// Add to cache of open key table and key table.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
new CacheKey<>(dbOpenKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -58,6 +60,9 @@
*/
public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {

private static final Logger LOG =
LoggerFactory.getLogger(OMKeyCommitRequestWithFSO.class);

public OMKeyCommitRequestWithFSO(OMRequest omRequest,
BucketLayout bucketLayout) {
super(omRequest, bucketLayout);
Expand Down Expand Up @@ -144,10 +149,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());

// Update the block length for each block
List<OmKeyLocationInfo> allocatedLocationInfoList =
omKeyInfo.getLatestVersionLocations().getLocationList();
omKeyInfo.updateLocationInfoList(locationInfoList, false);
List<OmKeyLocationInfo> uncommitted = omKeyInfo.updateLocationInfoList(
locationInfoList, false);

// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
Expand Down Expand Up @@ -177,6 +180,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omBucketInfo.incrUsedNamespace(1L);
}

// let the uncommitted blocks pretend as key's old version blocks
// which will be deleted as RepeatedOmKeyInfo
OmKeyInfo pseudoKeyInfo = wrapUncommittedBlocksAsPseudoKey(uncommitted,
omKeyInfo);
if (pseudoKeyInfo != null) {
if (oldKeyVersionsToDelete != null) {
oldKeyVersionsToDelete.addOmKeyInfo(pseudoKeyInfo);
} else {
oldKeyVersionsToDelete = new RepeatedOmKeyInfo(pseudoKeyInfo);
}
}

// Add to cache of open key table and key table.
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, dbFileKey,
null, fileName, trxnLogIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,4 +816,28 @@ protected OzoneLockStrategy getOzoneLockStrategy(OzoneManager ozoneManager) {
return ozoneManager.getOzoneLockProvider()
.createLockStrategy(getBucketLayout());
}

/**
* Wrap the uncommitted blocks as pseudoKeyInfo.
*
* @param uncommitted Uncommitted OmKeyLocationInfo
* @param omKeyInfo Args for key block
* @return pseudoKeyInfo
*/
protected OmKeyInfo wrapUncommittedBlocksAsPseudoKey(
List<OmKeyLocationInfo> uncommitted, OmKeyInfo omKeyInfo) {
if (uncommitted.isEmpty()) {
return null;
}
LOG.info("Detect allocated but uncommitted blocks {} in key {}.",
uncommitted, omKeyInfo.getKeyName());
OmKeyInfo pseudoKeyInfo = omKeyInfo.copyObject();
// TODO dataSize of pseudoKey is not real here
List<OmKeyLocationInfoGroup> uncommittedGroups = new ArrayList<>();
// version not matters in the current logic of keyDeletingService,
// all versions of blocks will be deleted.
uncommittedGroups.add(new OmKeyLocationInfoGroup(0, uncommitted));
pseudoKeyInfo.setKeyLocationVersions(uncommittedGroups);
return pseudoKeyInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ozone.om.response.key;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand Down Expand Up @@ -104,6 +105,11 @@ protected String getOzoneKeyName() {
return ozoneKeyName;
}

@VisibleForTesting
public RepeatedOmKeyInfo getKeysToDelete() {
return keysToDelete;
}

protected void updateDeletedTable(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
if (this.keysToDelete != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
import org.apache.hadoop.util.Time;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
Expand Down Expand Up @@ -190,6 +191,93 @@ public void testValidateAndUpdateCache() throws Exception {
omKeyInfo.getLatestVersionLocations().getLocationList());
}

@Test
public void testValidateAndUpdateCacheWithUncommittedBlocks()
throws Exception {

// allocated block list
List<KeyLocation> allocatedKeyLocationList = getKeyLocation(5);

List<OmKeyLocationInfo> allocatedBlockList = allocatedKeyLocationList
.stream().map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

// committed block list, with three blocks different with the allocated
List<KeyLocation> committedKeyLocationList = getKeyLocation(3);

OMRequest modifiedOmRequest = doPreExecute(createCommitKeyRequest(
committedKeyLocationList));

OMKeyCommitRequest omKeyCommitRequest =
getOmKeyCommitRequest(modifiedOmRequest);

OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager, omKeyCommitRequest.getBucketLayout());

String ozoneKey = addKeyToOpenKeyTable(allocatedBlockList);

// Key should not be there in key table, as validateAndUpdateCache is
// still not called.
OmKeyInfo omKeyInfo =
omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
.get(ozoneKey);

Assert.assertNull(omKeyInfo);

OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager,
100L, ozoneManagerDoubleBufferHelper);

Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omClientResponse.getOMResponse().getStatus());

List<OmKeyInfo> toDeleteKeyList = ((OMKeyCommitResponse) omClientResponse).
getKeysToDelete().cloneOmKeyInfoList();

// This is the first time to commit key, only the allocated but uncommitted
// blocks should be deleted.
Assert.assertEquals(1, toDeleteKeyList.size());
Assert.assertEquals(2, toDeleteKeyList.get(0).
getKeyLocationVersions().get(0).getLocationList().size());

// Entry should be deleted from openKey Table.
omKeyInfo =
omMetadataManager.getOpenKeyTable(omKeyCommitRequest.getBucketLayout())
.get(ozoneKey);
Assert.assertNull(omKeyInfo);

// Now entry should be created in key Table.
omKeyInfo =
omMetadataManager.getKeyTable(omKeyCommitRequest.getBucketLayout())
.get(ozoneKey);

Assert.assertNotNull(omKeyInfo);

// DB keyInfo format
verifyKeyName(omKeyInfo);

// Check modification time
CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest();
Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(),
omKeyInfo.getModificationTime());

// Check block location.
List<OmKeyLocationInfo> locationInfoListFromCommitKeyRequest =
commitKeyRequest.getKeyArgs()
.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

List<OmKeyLocationInfo> intersection = new ArrayList<>(allocatedBlockList);
intersection.retainAll(locationInfoListFromCommitKeyRequest);

// Key table should have three blocks.
Assert.assertEquals(intersection,
omKeyInfo.getLatestVersionLocations().getLocationList());
Assert.assertEquals(3, intersection.size());

}

@Test
public void testValidateAndUpdateCacheWithSubDirs() throws Exception {
parentDir = "dir1/dir2/dir3/";
Expand Down Expand Up @@ -466,15 +554,19 @@ private void verifyKeyArgs(KeyArgs originalKeyArgs, KeyArgs modifiedKeyArgs) {
modifiedKeyArgs.getFactor());
}

private OMRequest createCommitKeyRequest() {
return createCommitKeyRequest(getKeyLocation(5));
}

/**
* Create OMRequest which encapsulates CommitKeyRequest.
*/
private OMRequest createCommitKeyRequest() {
private OMRequest createCommitKeyRequest(List<KeyLocation> keyLocations) {
KeyArgs keyArgs =
KeyArgs.newBuilder().setDataSize(dataSize).setVolumeName(volumeName)
.setKeyName(keyName).setBucketName(bucketName)
.setType(replicationType).setFactor(replicationFactor)
.addAllKeyLocations(getKeyLocation(5)).build();
.addAllKeyLocations(keyLocations).build();

CommitKeyRequest commitKeyRequest =
CommitKeyRequest.newBuilder().setKeyArgs(keyArgs)
Expand Down
Loading