Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -35,7 +36,11 @@
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.util.StringUtils;
import org.junit.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,8 +108,8 @@ public void testCreateFile() throws Exception {
// Op 1. create dir -> /d1/d2/d3/d4/
Path parent = new Path("/d1/d2/");
Path file = new Path(parent, "file1");
fs.create(file);
ArrayList<String> openFileKeys = new ArrayList<>();
FSDataOutputStream outputStream = fs.create(file);
String openFileKey = "";

OMMetadataManager omMgr = cluster.getOzoneManager().getMetadataManager();
OmBucketInfo omBucketInfo = omMgr.getBucketTable().get(
Expand All @@ -116,7 +121,7 @@ public void testCreateFile() throws Exception {
dirKeys, omMgr);
long d2ObjectID = verifyDirKey(d1ObjectID, "d2", "/d1/d2", dirKeys,
omMgr);
openFileKeys.add(d2ObjectID + OzoneConsts.OM_KEY_PREFIX + file.getName());
openFileKey = d2ObjectID + OzoneConsts.OM_KEY_PREFIX + file.getName();

// verify entries in directory table
TableIterator<String, ? extends
Expand All @@ -141,9 +146,18 @@ public void testCreateFile() throws Exception {
while (keysItr.hasNext()) {
count++;
Table.KeyValue<String, OmKeyInfo> value = keysItr.next();
verifyOpenKeyFormat(value.getKey(), openFileKeys);
verifyOpenKeyFormat(value.getKey(), openFileKey);
}
Assert.assertEquals("Unexpected file table entries!", 1, count);

// trigger CommitKeyRequest
outputStream.close();

Assert.assertTrue("Failed to commit the open file:" + openFileKey,
omMgr.getOpenKeyTable().isEmpty());

OmKeyInfo omKeyInfo = omMgr.getKeyTable().get(openFileKey);
Assert.assertNotNull("Invalid Key!", omKeyInfo);
}


Expand All @@ -168,13 +182,13 @@ private void verifyKeyFormat(String key, ArrayList<String> dirKeys) {
* openFileKeys list.
*
* @param key table keyName
* @param openFileKeys expected keyName
* @param openFileKey expected keyName
*/
private void verifyOpenKeyFormat(String key, ArrayList<String> openFileKeys) {
private void verifyOpenKeyFormat(String key, String openFileKey) {
String[] keyParts = StringUtils.split(key,
OzoneConsts.OM_KEY_PREFIX.charAt(0));
Assert.assertEquals("Invalid KeyName:" + key, 3, keyParts.length);
String[] expectedOpenFileParts = StringUtils.split(openFileKeys.get(0),
String[] expectedOpenFileParts = StringUtils.split(openFileKey,
OzoneConsts.OM_KEY_PREFIX.charAt(0));
Assert.assertEquals("ParentId/Key:" + expectedOpenFileParts[0]
+ " doesn't exists in openFileTable!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
* |----------------------------------------------------------------------|
* | directoryTable | parentId/directoryName -> DirectoryInfo |
* |----------------------------------------------------------------------|
* | fileTable | parentId/fileName/id -> FileInfo |
* | fileTable | parentId/fileName -> KeyInfo |
* |----------------------------------------------------------------------|
* | openFileTable | parentId/fileName -> FileInfo |
* | openFileTable | parentId/fileName/id -> KeyInfo |
* |----------------------------------------------------------------------|
* | transactionInfoTable | #TRANSACTIONINFO -> OMTransactionInfo |
* |----------------------------------------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.ozone.om.request.key.OMKeysDeleteRequest;
import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestV1;
import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
Expand Down Expand Up @@ -139,6 +140,9 @@ public static OMClientRequest createClientRequest(OMRequest omRequest) {
case CreateKey:
return new OMKeyCreateRequest(omRequest);
case CommitKey:
if (omLayoutVersionV1) {
return new OMKeyCommitRequestV1(omRequest);
}
return new OMKeyCommitRequest(omRequest);
case DeleteKey:
return new OMKeyDeleteRequest(omRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,33 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

return omClientResponse;
}

protected void processResult(CommitKeyRequest commitKeyRequest,
String volumeName, String bucketName,
String keyName, OMMetrics omMetrics,
IOException exception, OmKeyInfo omKeyInfo,
Result result) {
switch (result) {
case SUCCESS:
// As when we commit the key, then it is visible in ozone, so we should
// increment here.
// As key also can have multiple versions, we need to increment keys
// only if version is 0. Currently we have not complete support of
// versioning of keys. So, this can be revisited later.
if (omKeyInfo.getKeyLocationVersions().size() == 1) {
omMetrics.incNumKeys();
}
LOG.debug("Key committed. Volume:{}, Bucket:{}, Key:{}", volumeName,
bucketName, keyName);
break;
case FAILURE:
LOG.error("Key commit failed. Volume:{}, Bucket:{}, Key:{}. Exception:{}",
volumeName, bucketName, keyName, exception);
omMetrics.incNumKeyCommitFails();
break;
default:
LOG.error("Unrecognized Result for OMKeyCommitRequest: {}",
commitKeyRequest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyCommitResponseV1;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,10 +55,11 @@
import java.util.Map;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;

/**
* Handles CommitKey request.
* Handles CommitKey request layout version V1
*/
public class OMKeyCommitRequestV1 extends OMKeyCommitRequest {

Expand Down Expand Up @@ -115,7 +124,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
}

bucketLockAcquired =
omMetadataManager.getLock().acquireLock(BUCKET_LOCK,
omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
volumeName, bucketName);

validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
Expand All @@ -124,7 +133,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omBucketInfo = omMetadataManager.getBucketTable().get(bucketKey);
long bucketId = omBucketInfo.getObjectID();
long parentID = getParentID(bucketId, pathComponents, keyName,
omMetadataManager);
omMetadataManager, ozoneManager);
String dbFileKey = omMetadataManager.getOzonePathKey(parentID, fileName);
dbOpenFileKey = omMetadataManager.getOpenFileName(parentID, fileName,
commitKeyRequest.getClientID());
Expand Down Expand Up @@ -166,7 +175,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omVolumeArgs.getUsedBytes().add(correctedSpace);
omBucketInfo.getUsedBytes().add(correctedSpace);

omClientResponse = new OMKeyCommitResponse(omResponse.build(),
omClientResponse = new OMKeyCommitResponseV1(omResponse.build(),
omKeyInfo, dbFileKey, dbOpenFileKey, omVolumeArgs, omBucketInfo);

result = Result.SUCCESS;
Expand All @@ -180,40 +189,40 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
omDoubleBufferHelper);

if(bucketLockAcquired) {
omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
bucketName);
}
}

auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
exception, getOmRequest().getUserInfo()));

switch (result) {
case SUCCESS:
// As when we commit the key, then it is visible in ozone, so we should
// increment here.
// As key also can have multiple versions, we need to increment keys
// only if version is 0. Currently we have not complete support of
// versioning of keys. So, this can be revisited later.
if (omKeyInfo.getKeyLocationVersions().size() == 1) {
omMetrics.incNumKeys();
}
LOG.debug("Key committed. Volume:{}, Bucket:{}, Key:{}", volumeName,
bucketName, keyName);
break;
case FAILURE:
LOG.error("Key commit failed. Volume:{}, Bucket:{}, Key:{}. Exception:{}",
volumeName, bucketName, keyName, exception);
omMetrics.incNumKeyCommitFails();
break;
default:
LOG.error("Unrecognized Result for OMKeyCommitRequest: {}",
commitKeyRequest);
}
processResult(commitKeyRequest, volumeName, bucketName, keyName, omMetrics,
exception, omKeyInfo, result);

return omClientResponse;
}


/**
* Check for directory exists with same name, if it exists throw error.
*
* @param keyName key name
* @param ozoneManager Ozone Manager
* @param reachedLastPathComponent true if the path component is a fileName
* @throws IOException if directory exists with same name
*/
private void checkDirectoryAlreadyExists(String keyName,
OzoneManager ozoneManager,
boolean reachedLastPathComponent)
throws IOException {
// Reached last component, which would be a file. Returns its parentID.
if (reachedLastPathComponent && ozoneManager.getEnableFileSystemPaths()) {
throw new OMException("Can not create file: " + keyName +
" as there is already directory in the given path", NOT_A_FILE);
}
}

/**
* Get parent id for the user given path.
*
Expand All @@ -225,36 +234,42 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
* @throws IOException DB failure or parent not exists in DirectoryTable
*/
private long getParentID(long bucketId, Iterator<Path> pathComponents,
String keyName,
OMMetadataManager omMetadataManager)
String keyName, OMMetadataManager omMetadataManager,
OzoneManager ozoneManager)
throws IOException {

long lastKnownParentId = bucketId;
boolean parentFound = true; // default bucketID as parent
OmDirectoryInfo omDirectoryInfo = null;

// If no sub-dirs then bucketID is the root/parent.
if(!pathComponents.hasNext()){
return bucketId;
}

OmDirectoryInfo omDirectoryInfo;
while (pathComponents.hasNext()) {
String nodeName = pathComponents.next().toString();
// Reached last component, which would be a file. Returns its parentID.
if (!pathComponents.hasNext()) {
return lastKnownParentId;
}
boolean reachedLastPathComponent = !pathComponents.hasNext();
String dbNodeName =
omMetadataManager.getOzonePathKey(lastKnownParentId, nodeName);

omDirectoryInfo = omMetadataManager.
getDirectoryTable().get(dbNodeName);
if (omDirectoryInfo != null) {
checkDirectoryAlreadyExists(keyName, ozoneManager,
reachedLastPathComponent);
lastKnownParentId = omDirectoryInfo.getObjectID();
} else {
parentFound = false;
// One of the sub-dir doesn't exists in DB. Immediate parent should
// exists for committing the key, otherwise will fail the operation.
if (!reachedLastPathComponent) {
throw new OMException("Failed to commit key, as parent directory of "
+ keyName + " entry is not found in DirectoryTable",
KEY_NOT_FOUND);
}
break;
}
}

if (!parentFound) {
throw new OMException("Failed to commit key, as parent directory of "
+ keyName + " entry is not found in DirectoryTable",
KEY_NOT_FOUND);
}
return lastKnownParentId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public OMFileCreateResponseV1(@Nonnull OMResponse omResponse,
}

@Override
protected void addToDBBatch(OMMetadataManager omMetadataMgr,
public void addToDBBatch(OMMetadataManager omMetadataMgr,
BatchOperation batchOp) throws IOException {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,13 @@
@CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE})
public class OMKeyCommitResponseV1 extends OMKeyCommitResponse {

private OmKeyInfo omKeyInfo;
private String ozoneKeyName;
private String openKeyName;

public OMKeyCommitResponseV1(@Nonnull OMResponse omResponse,
@Nonnull OmKeyInfo omKeyInfo,
String ozoneKeyName, String openKeyName,
@Nonnull OmVolumeArgs omVolumeArgs,
@Nonnull OmBucketInfo omBucketInfo) {
super(omResponse, omKeyInfo, ozoneKeyName, openKeyName, omVolumeArgs,
omBucketInfo);
this.omKeyInfo = omKeyInfo;
this.ozoneKeyName = ozoneKeyName;
this.openKeyName = openKeyName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public OMKeyCreateResponse(@Nonnull OMResponse omResponse) {
}

@Override
protected void addToDBBatch(OMMetadataManager omMetadataManager,
public void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ public void testValidateAndUpdateCache() throws Exception {
// Check open table whether key is added or not.

omKeyInfo = verifyPathInOpenKeyTable(keyName, id, true);
Assert.assertNotNull(omKeyInfo);

List< OmKeyLocationInfo > omKeyLocationInfoList =
omKeyInfo.getLatestVersionLocations().getLocationList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
*/
public class TestOMKeyCommitRequest extends TestOMKeyRequest {

protected String parentDir;
private String parentDir;

@Test
public void testPreExecute() throws Exception {
Expand Down Expand Up @@ -296,6 +296,10 @@ private List<KeyLocation> getKeyLocation() {
return keyLocations;
}

protected String getParentDir() {
return parentDir;
}

@NotNull
protected String getOzonePathKey() throws IOException {
return omMetadataManager.getOzoneKey(volumeName, bucketName,
Expand Down
Loading