Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Objects;

import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
Expand All @@ -34,13 +36,16 @@
import org.apache.hadoop.util.Time;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Args for key block. The block instance for the key requested in putKey.
* This is returned from OM to client, and client use class to talk to
* datanode. Also, this is the metadata written to om.db on server side.
*/
public final class OmKeyInfo extends WithObjectID {
private static final Logger LOG = LoggerFactory.getLogger(OmKeyInfo.class);
private final String volumeName;
private final String bucketName;
// name of key client specified
Expand Down Expand Up @@ -147,10 +152,35 @@ public void updateModifcationTime() {
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
boolean isMpu) {
updateLocationInfoList(locationInfoList, isMpu, false);
}

/**
* updates the length of the each block in the list given.
* This will be called when the key is being committed to OzoneManager.
*
* @param locationInfoList list of locationInfo
* @param isMpu a true represents multi part key, false otherwise
* @param skipBlockIDCheck a true represents that the blockId verification
* check should be skipped, false represents that
* the blockId verification will be required
*/
public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
boolean isMpu, boolean skipBlockIDCheck) {
long latestVersion = getLatestVersionLocations().getVersion();
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();

keyLocationInfoGroup.setMultipartKey(isMpu);

// Compare user given block location against allocatedBlockLocations
// present in OmKeyInfo.
List<OmKeyLocationInfo> updatedBlockLocations;
if (skipBlockIDCheck) {
updatedBlockLocations = locationInfoList;
} else {
updatedBlockLocations =
verifyAndGetKeyLocations(locationInfoList, keyLocationInfoGroup);
}
// Updates the latest locationList in the latest version only with
// given locationInfoList here.
// TODO : The original allocated list and the updated list here may vary
Expand All @@ -159,12 +189,37 @@ public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList,
// need to be garbage collected in case the ozone client dies.
keyLocationInfoGroup.removeBlocks(latestVersion);
// set each of the locationInfo object to the latest version
locationInfoList.forEach(omKeyLocationInfo -> omKeyLocationInfo
updatedBlockLocations.forEach(omKeyLocationInfo -> omKeyLocationInfo
.setCreateVersion(latestVersion));
keyLocationInfoGroup.addAll(latestVersion, locationInfoList);
keyLocationInfoGroup.addAll(latestVersion, updatedBlockLocations);
}

private List<OmKeyLocationInfo> verifyAndGetKeyLocations(
List<OmKeyLocationInfo> locationInfoList,
OmKeyLocationInfoGroup keyLocationInfoGroup) {

List<OmKeyLocationInfo> allocatedBlockLocations =
keyLocationInfoGroup.getBlocksLatestVersionOnly();
List<OmKeyLocationInfo> updatedBlockLocations = new ArrayList<>();

List<ContainerBlockID> existingBlockIDs = new ArrayList<>();
for (OmKeyLocationInfo existingLocationInfo : allocatedBlockLocations) {
BlockID existingBlockID = existingLocationInfo.getBlockID();
existingBlockIDs.add(existingBlockID.getContainerBlockID());
}

for (OmKeyLocationInfo modifiedLocationInfo : locationInfoList) {
BlockID modifiedBlockID = modifiedLocationInfo.getBlockID();
if (existingBlockIDs.contains(modifiedBlockID.getContainerBlockID())) {
updatedBlockLocations.add(modifiedLocationInfo);
} else {
LOG.warn("Unknown BlockLocation:{}, where the blockID of given "
+ "location doesn't match with the stored/allocated block of"
+ " keyName:{}", modifiedLocationInfo, keyName);
}
}
return updatedBlockLocations;
}

/**
* Append a set of blocks to the latest version. Note that these blocks are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,13 @@ public void testLookupKeyWithLocation() throws IOException {
Pipeline pipeline = scm.getPipelineManager().createPipeline(
ReplicationType.RATIS, ReplicationFactor.THREE, nodeList);
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
List<OmKeyLocationInfo> locationList =
keySession.getKeyInfo().getLatestVersionLocations().getLocationList();
Assert.assertEquals(1, locationList.size());
locationInfoList.add(
new OmKeyLocationInfo.Builder().setPipeline(pipeline)
.setBlockID(new BlockID(1L, 1L)).build());
.setBlockID(new BlockID(locationList.get(0).getContainerID(),
locationList.get(0).getLocalID())).build());
keyArgs.setLocationInfoList(locationInfoList);

keyManager.commitKey(keyArgs, keySession.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
// But now as versioning is not supported, just following the commit
// key approach. When versioning support comes, then we can uncomment
// below code keyInfo.addNewVersion(locations);
omKeyInfo.updateLocationInfoList(partLocationInfos, true);
omKeyInfo.updateLocationInfoList(partLocationInfos, true, true);
omKeyInfo.setModificationTime(keyArgs.getModificationTime());
omKeyInfo.setDataSize(dataSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,33 @@ public static void addKeyToTableAndCache(String volumeName, String bucketName,
replicationType, replicationFactor, trxnLogIndex, omMetadataManager);
}

/**
* Add key entry to KeyTable. if openKeyTable flag is true, add's entries
* to openKeyTable, else add's it to keyTable.
* @param openKeyTable
* @param volumeName
* @param bucketName
* @param keyName
* @param clientID
* @param replicationType
* @param replicationFactor
* @param omMetadataManager
* @param locationList
* @throws Exception
*/
@SuppressWarnings("parameterNumber")
public static void addKeyToTable(boolean openKeyTable, String volumeName,
String bucketName, String keyName, long clientID,
HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor replicationFactor,
OMMetadataManager omMetadataManager,
List<OmKeyLocationInfo> locationList) throws Exception {
addKeyToTable(openKeyTable, false, volumeName, bucketName, keyName,
clientID, replicationType, replicationFactor, 0L, omMetadataManager,
locationList);
}


/**
* Add key entry to KeyTable. if openKeyTable flag is true, add's entries
* to openKeyTable, else add's it to keyTable.
Expand Down Expand Up @@ -137,14 +164,34 @@ public static void addKeyToTable(boolean openKeyTable, boolean addToCache,
String volumeName, String bucketName, String keyName, long clientID,
HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor replicationFactor, long trxnLogIndex,
OMMetadataManager omMetadataManager) throws Exception {
OMMetadataManager omMetadataManager,
List<OmKeyLocationInfo> locationList) throws Exception {

OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName,
replicationType, replicationFactor, trxnLogIndex);
omKeyInfo.appendNewBlocks(locationList, false);

addKeyToTable(openKeyTable, addToCache, omKeyInfo, clientID, trxnLogIndex,
omMetadataManager);
}

/**
* Add key entry to KeyTable. if openKeyTable flag is true, add's entries
* to openKeyTable, else add's it to keyTable.
* @throws Exception
*/
@SuppressWarnings("parameternumber")
public static void addKeyToTable(boolean openKeyTable, boolean addToCache,
String volumeName, String bucketName, String keyName, long clientID,
HddsProtos.ReplicationType replicationType,
HddsProtos.ReplicationFactor replicationFactor, long trxnLogIndex,
OMMetadataManager omMetadataManager) throws Exception {

OmKeyInfo omKeyInfo = createOmKeyInfo(volumeName, bucketName, keyName,
replicationType, replicationFactor, trxnLogIndex);

addKeyToTable(openKeyTable, addToCache, omKeyInfo, clientID, trxnLogIndex,
omMetadataManager);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,64 @@ public void testPreExecute() throws Exception {
doPreExecute(createCommitKeyRequest());
}

@Test
public void testValidateAndUpdateCacheWithUnknownBlockId() throws Exception {

OMRequest modifiedOmRequest =
doPreExecute(createCommitKeyRequest());

OMKeyCommitRequest omKeyCommitRequest =
new OMKeyCommitRequest(modifiedOmRequest);

// Append 3 blocks locations.
List<OmKeyLocationInfo> allocatedLocationList = getKeyLocation(3)
.stream().map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);

TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
clientID, replicationType, replicationFactor, omMetadataManager,
allocatedLocationList);

String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName);

// Key should not be there in key table, as validateAndUpdateCache is
// still not called.
OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);

Assert.assertNull(omKeyInfo);

OMClientResponse omClientResponse =
omKeyCommitRequest.validateAndUpdateCache(ozoneManager,
100L, ozoneManagerDoubleBufferHelper);

Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omClientResponse.getOMResponse().getStatus());

// Entry should be deleted from openKey Table.
omKeyInfo = omMetadataManager.getOpenKeyTable().get(ozoneKey);
Assert.assertNull(omKeyInfo);

// Now entry should be created in key Table.
omKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);

Assert.assertNotNull(omKeyInfo);

// Check modification time

CommitKeyRequest commitKeyRequest = modifiedOmRequest.getCommitKeyRequest();
Assert.assertEquals(commitKeyRequest.getKeyArgs().getModificationTime(),
omKeyInfo.getModificationTime());

// Check block location.
Assert.assertEquals(allocatedLocationList,
omKeyInfo.getLatestVersionLocations().getLocationList());

}

@Test
public void testValidateAndUpdateCache() throws Exception {

Expand All @@ -62,11 +120,21 @@ public void testValidateAndUpdateCache() throws Exception {
OMKeyCommitRequest omKeyCommitRequest =
new OMKeyCommitRequest(modifiedOmRequest);


KeyArgs keyArgs = modifiedOmRequest.getCommitKeyRequest().getKeyArgs();

// Append new blocks
List<OmKeyLocationInfo> allocatedLocationList =
keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());

TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
omMetadataManager);

TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName, keyName,
clientID, replicationType, replicationFactor, omMetadataManager);
clientID, replicationType, replicationFactor, omMetadataManager,
allocatedLocationList);

String ozoneKey = omMetadataManager.getOzoneKey(volumeName, bucketName,
keyName);
Expand Down Expand Up @@ -107,6 +175,8 @@ public void testValidateAndUpdateCache() throws Exception {

Assert.assertEquals(locationInfoListFromCommitKeyRequest,
omKeyInfo.getLatestVersionLocations().getLocationList());
Assert.assertEquals(allocatedLocationList,
omKeyInfo.getLatestVersionLocations().getLocationList());

}

Expand Down Expand Up @@ -264,7 +334,7 @@ private OMRequest createCommitKeyRequest() {
KeyArgs.newBuilder().setDataSize(dataSize).setVolumeName(volumeName)
.setKeyName(keyName).setBucketName(bucketName)
.setType(replicationType).setFactor(replicationFactor)
.addAllKeyLocations(getKeyLocation()).build();
.addAllKeyLocations(getKeyLocation(5)).build();

CommitKeyRequest commitKeyRequest =
CommitKeyRequest.newBuilder().setKeyArgs(keyArgs)
Expand All @@ -279,10 +349,10 @@ private OMRequest createCommitKeyRequest() {
/**
* Create KeyLocation list.
*/
private List<KeyLocation> getKeyLocation() {
private List<KeyLocation> getKeyLocation(int count) {
List<KeyLocation> keyLocations = new ArrayList<>();

for (int i=0; i < 5; i++) {
for (int i=0; i < count; i++) {
KeyLocation keyLocation =
KeyLocation.newBuilder()
.setBlockID(HddsProtos.BlockID.newBuilder()
Expand Down