diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 01fa7c8a525d..25690ed10554 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -412,6 +412,7 @@ private OzoneConsts() { /** Metadata stored in OmKeyInfo. */ public static final String HSYNC_CLIENT_ID = "hsyncClientId"; + public static final String LEASE_RECOVERY = "leaseRecovery"; //GDPR public static final String GDPR_FLAG = "gdprEnabled"; diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 20b4fb1ed001..c5cf619e3729 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -267,6 +267,8 @@ public enum ResultCodes { S3_SECRET_ALREADY_EXISTS, - INVALID_PATH + INVALID_PATH, + KEY_UNDER_LEASE_RECOVERY, + KEY_ALREADY_CLOSED } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index ad394bf4d1db..824346058dec 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -24,6 +24,7 @@ import java.util.UUID; import javax.annotation.Nonnull; + import org.apache.hadoop.fs.SafeModeAction; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.ozone.OzoneAcl; @@ -269,6 +270,19 @@ default void hsyncKey(OmKeyArgs args, long clientID) "this to be implemented, as write requests use a new approach."); } + /** + * Recovery and commit a key. This will make the change from the client visible. The client + * is identified by the clientID. + * + * @param args the key to commit + * @param clientID the client identification + * @throws IOException + */ + default void recoverKey(OmKeyArgs args, long clientID) + throws IOException { + throw new UnsupportedOperationException("OzoneManager does not require " + + "this to be implemented, as write requests use a new approach."); + } /** * Allocate a new block, it is assumed that the client is having an open key @@ -1086,11 +1100,10 @@ EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp, * @param volumeName - The volume name. * @param bucketName - The bucket name. * @param keyName - The key user want to recover. - * @return true if the file is already closed + * @return OmKeyInfo KeyInfo is file under recovery * @throws IOException if an error occurs */ - boolean recoverLease(String volumeName, String bucketName, - String keyName) throws IOException; + List recoverLease(String volumeName, String bucketName, String keyName) throws IOException; /** * Update modification time and access time of a file. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index a179ca5c4084..f9572b3bdc5b 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -777,13 +777,19 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId, @Override public void hsyncKey(OmKeyArgs args, long clientId) throws IOException { - updateKey(args, clientId, true); + updateKey(args, clientId, true, false); } @Override public void commitKey(OmKeyArgs args, long clientId) throws IOException { - updateKey(args, clientId, false); + updateKey(args, clientId, false, false); + } + + @Override + public void recoverKey(OmKeyArgs args, long clientId) + throws IOException { + updateKey(args, clientId, false, true); } public static void setReplicationConfig(ReplicationConfig replication, @@ -799,7 +805,7 @@ public static void setReplicationConfig(ReplicationConfig replication, b.setType(replication.getReplicationType()); } - private void updateKey(OmKeyArgs args, long clientId, boolean hsync) + private void updateKey(OmKeyArgs args, long clientId, boolean hsync, boolean recovery) throws IOException { CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder(); List locationInfoList = args.getLocationInfoList(); @@ -820,15 +826,13 @@ private void updateKey(OmKeyArgs args, long clientId, boolean hsync) req.setKeyArgs(keyArgsBuilder.build()); req.setClientID(clientId); req.setHsync(hsync); - + req.setRecovery(recovery); OMRequest omRequest = createOMRequest(Type.CommitKey) .setCommitKeyRequest(req) .build(); handleError(submitRequest(omRequest)); - - } @Override @@ -2440,21 +2444,23 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp, } @Override - public boolean recoverLease(String volumeName, String bucketName, - String keyName) throws IOException { + public List recoverLease(String volumeName, String bucketName, String keyName) throws IOException { RecoverLeaseRequest recoverLeaseRequest = - RecoverLeaseRequest.newBuilder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName) - .build(); + RecoverLeaseRequest.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); OMRequest omRequest = createOMRequest(Type.RecoverLease) - .setRecoverLeaseRequest(recoverLeaseRequest).build(); + .setRecoverLeaseRequest(recoverLeaseRequest).build(); RecoverLeaseResponse recoverLeaseResponse = - handleError(submitRequest(omRequest)).getRecoverLeaseResponse(); - return recoverLeaseResponse.getResponse(); + handleError(submitRequest(omRequest)).getRecoverLeaseResponse(); + ArrayList list = new ArrayList(); + list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo())); + list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo())); + return list; } @Override diff --git a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot index a721f2acbbe6..1f9646579dc1 100644 --- a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot +++ b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot @@ -36,12 +36,12 @@ Create volume bucket and put key *** Test Cases *** Test ozone debug recover for o3fs ${result} = Execute Lease recovery cli o3fs://${BUCKET}.${VOLUME}.om/${TESTFILE} - Should Contain ${result} Lease recovery SUCCEEDED + Should Contain ${result} Key: ${TESTFILE} is already closed ${result} = Execute Lease recovery cli o3fs://${BUCKET}.${VOLUME}.om/randomfile Should Contain ${result} not found Test ozone debug recover for ofs ${result} = Execute Lease recovery cli ofs://om/${VOLUME}/${BUCKET}/${TESTFILE} - Should Contain ${result} Lease recovery SUCCEEDED + Should Contain ${result} Key: ${TESTFILE} is already closed ${result} = Execute Lease recovery cli ofs://om/${VOLUME}/${BUCKET}/randomfile Should Contain ${result} not found diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java index 2b8a8cb02b3d..1cd8a7e2b4d1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java @@ -149,8 +149,7 @@ public void testRecovery() throws Exception { Thread.sleep(1000); } // The lease should have been recovered. - assertTrue("File should be closed", fs.recoverLease(file)); - assertTrue(fs.isFileClosed(file)); + assertTrue("File should be closed", fs.isFileClosed(file)); } finally { closeIgnoringKeyNotFound(stream); } diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 54cafbc0ad6d..737bf166c3a0 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -524,6 +524,8 @@ enum Status { S3_SECRET_ALREADY_EXISTS = 92; INVALID_PATH = 93; + KEY_UNDER_LEASE_RECOVERY = 94; + KEY_ALREADY_CLOSED = 95; } /** @@ -1413,6 +1415,7 @@ message CommitKeyRequest { required KeyArgs keyArgs = 1; required uint64 clientID = 2; optional bool hsync = 3; + optional bool recovery = 4; } message CommitKeyResponse { @@ -2070,7 +2073,8 @@ message RecoverLeaseRequest { } message RecoverLeaseResponse { - optional bool response = 1; + optional KeyInfo keyInfo = 1; + optional KeyInfo openKeyInfo = 2; } message SetTimesRequest { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 5fb3872d8819..3c6085828f4c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -4603,9 +4603,8 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp, } @Override - public boolean recoverLease(String volumeName, String bucketName, - String keyName) { - return false; + public List recoverLease(String volumeName, String bucketName, String keyName) { + return null; } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java index 4855d7f7ab0b..7b920f9949c4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java @@ -20,6 +20,8 @@ import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -34,14 +36,11 @@ import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.response.file.OMRecoverLeaseResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .RecoverLeaseRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .RecoverLeaseResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse; +import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.OzoneObj; @@ -58,10 +57,9 @@ import java.util.LinkedHashMap; import java.util.Map; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; -import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .Type.RecoverLease; /** * Perform actions for RecoverLease requests. @@ -75,6 +73,8 @@ public class OMRecoverLeaseRequest extends OMKeyRequest { private String keyName; private OmKeyInfo keyInfo; private String dbFileKey; + private OmKeyInfo openKeyInfo; + private String dbOpenFileKey; private OMMetadataManager omMetadataManager; @@ -141,28 +141,21 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, acquiredLock = getOmLockDetails().isLockAcquired(); validateBucketAndVolume(omMetadataManager, volumeName, bucketName); - String openKeyEntryName = doWork(ozoneManager, transactionLogIndex); + RecoverLeaseResponse recoverLeaseResponse = doWork(ozoneManager, transactionLogIndex); // Prepare response boolean responseCode = true; - omResponse - .setRecoverLeaseResponse( - RecoverLeaseResponse.newBuilder() - .setResponse(responseCode) - .build()) - .setCmdType(RecoverLease); - omClientResponse = - new OMRecoverLeaseResponse(omResponse.build(), getBucketLayout(), - keyInfo, dbFileKey, openKeyEntryName); + omResponse.setRecoverLeaseResponse(recoverLeaseResponse).setCmdType(RecoverLease); + omClientResponse = new OMRecoverLeaseResponse(omResponse.build(), getBucketLayout(), + dbOpenFileKey, openKeyInfo); omMetrics.incNumRecoverLease(); - LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}", volumeName, - bucketName, keyName); + LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}", + volumeName, bucketName, keyName); } catch (IOException | InvalidPathException ex) { LOG.error("Fail for recovering lease. Volume:{}, Bucket:{}, Key:{}", volumeName, bucketName, keyName, ex); exception = ex; omMetrics.incNumRecoverLeaseFails(); - omResponse.setCmdType(RecoverLease); omClientResponse = new OMRecoverLeaseResponse( createErrorOMResponse(omResponse, exception), getBucketLayout()); } finally { @@ -186,60 +179,56 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, return omClientResponse; } - private String doWork(OzoneManager ozoneManager, long transactionLogIndex) - throws IOException { - + private RecoverLeaseResponse doWork(OzoneManager ozoneManager, + long transactionLogIndex) throws IOException { final long volumeId = omMetadataManager.getVolumeId(volumeName); - final long bucketId = omMetadataManager.getBucketId( - volumeName, bucketName); + final long bucketId = omMetadataManager.getBucketId(volumeName, bucketName); Iterator pathComponents = Paths.get(keyName).iterator(); long parentID = OMFileRequest.getParentID(volumeId, bucketId, pathComponents, keyName, omMetadataManager, "Cannot recover file : " + keyName + " as parent directory doesn't exist"); String fileName = OzoneFSUtils.getFileName(keyName); - dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId, - parentID, fileName); + dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId, parentID, fileName); keyInfo = getKey(dbFileKey); if (keyInfo == null) { - throw new OMException("Key:" + keyName + " not found", KEY_NOT_FOUND); + throw new OMException("Key:" + keyName + " not found in keyTable", KEY_NOT_FOUND); } - final String clientId = keyInfo.getMetadata().remove( - OzoneConsts.HSYNC_CLIENT_ID); - if (clientId == null) { + + final String writerId = keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID); + if (writerId == null) { // if file is closed, do nothing and return right away. - LOG.warn("Key:" + keyName + " is already closed"); - return null; + throw new OMException("Key: " + keyName + " is already closed", KEY_ALREADY_CLOSED); } - String openFileDBKey = omMetadataManager.getOpenFileName( - volumeId, bucketId, parentID, fileName, Long.parseLong(clientId)); - if (openFileDBKey != null) { - commitKey(dbFileKey, keyInfo, fileName, ozoneManager, - transactionLogIndex); - removeOpenKey(openFileDBKey, fileName, transactionLogIndex); + + dbOpenFileKey = omMetadataManager.getOpenFileName( + volumeId, bucketId, parentID, fileName, Long.parseLong(writerId)); + openKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenFileKey); + if (openKeyInfo == null) { + throw new OMException("Open Key " + dbOpenFileKey + " not found in openKeyTable", KEY_NOT_FOUND); + } + + if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) { + LOG.debug("Key: " + keyName + " is already under recovery"); + } else { + openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true"); + openKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled()); + openKeyInfo.setModificationTime(Time.now()); + // Add to cache. + omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry( + new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, openKeyInfo)); } + keyInfo.setKeyName(keyName); + openKeyInfo.setKeyName(keyName); + RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder() + .setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true)) + .setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true)); - return openFileDBKey; + return rb.build(); } private OmKeyInfo getKey(String dbOzoneKey) throws IOException { return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); } - - private void commitKey(String dbOzoneKey, OmKeyInfo omKeyInfo, - String fileName, OzoneManager ozoneManager, - long transactionLogIndex) throws IOException { - omKeyInfo.setModificationTime(Time.now()); - omKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled()); - - OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbOzoneKey, - omKeyInfo, fileName, transactionLogIndex); - } - - private void removeOpenKey(String openKeyName, String fileName, - long transactionLogIndex) { - OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager, - openKeyName, null, fileName, transactionLogIndex); - } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java index 254aafcb508d..ababd28c0a0c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; /** @@ -138,7 +139,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { newAllocatedBlockRequest.setKeyLocation( omKeyLocationInfoList.get(0).getProtobuf(getOmRequest().getVersion())); - return getOmRequest().toBuilder().setUserInfo(getUserInfo()) + return getOmRequest().toBuilder().setUserInfo(getUserIfNotExists(ozoneManager)) .setAllocateBlockRequest(newAllocatedBlockRequest).build(); } @@ -206,6 +207,10 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, KEY_NOT_FOUND); } + if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) { + throw new OMException("Open Key " + openKeyName + " is under lease recovery", + KEY_UNDER_LEASE_RECOVERY); + } List newLocationList = Collections.singletonList( OmKeyLocationInfo.getFromProtobuf(blockLocation)); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java index eb5a9ccb53b0..9fd8fe0ffa5c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java @@ -59,6 +59,7 @@ import java.util.Map; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; /** @@ -136,7 +137,10 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, throw new OMException("Open Key not found " + openKeyName, KEY_NOT_FOUND); } - + if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) { + throw new OMException("Open Key " + openKeyName + " is under lease recovery", + KEY_UNDER_LEASE_RECOVERY); + } List newLocationList = Collections.singletonList( OmKeyLocationInfo.getFromProtobuf(blockLocation)); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java index ebcb1a040a4b..56d7b72e6201 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java @@ -68,7 +68,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.util.Time; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; @@ -102,12 +104,15 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { OmUtils.validateKeyName(StringUtils.removeEnd(keyArgs.getKeyName(), OzoneConsts.FS_FILE_COPYING_TEMP_SUFFIX)); } - boolean isHsync = commitKeyRequest.hasHsync() && - commitKeyRequest.getHsync(); + boolean isHsync = commitKeyRequest.hasHsync() && commitKeyRequest.getHsync(); + boolean isRecovery = commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery(); boolean enableHsync = ozoneManager.getConfiguration().getBoolean( OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED_DEFAULT); - if (isHsync && !enableHsync) { + + // If hsynced is called for a file, then this file is hsynced, otherwise it's not hsynced. + // Currently, file lease recovery by design only supports recover hsynced file + if ((isHsync || isRecovery) && !enableHsync) { throw new OMException("Hsync is not enabled. To enable, " + "set ozone.fs.hsync.enabled = true", NOT_SUPPORTED_OPERATION); } @@ -156,17 +161,20 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); - boolean isHSync = commitKeyRequest.hasHsync() && - commitKeyRequest.getHsync(); - + boolean isHSync = commitKeyRequest.hasHsync() && commitKeyRequest.getHsync(); + boolean isRecovery = commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery(); + // isHsync = true, a commit request as a result of client side hsync call + // isRecovery = true, a commit request as a result of client side recoverLease call + // none of isHsync and isRecovery is true, a commit request as a result of client side normal + // outputStream#close call. if (isHSync) { omMetrics.incNumKeyHSyncs(); } else { omMetrics.incNumKeyCommits(); } - LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}", - isHSync, volumeName, bucketName, keyName); + LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName = {}, keyName = {}", + isHSync, isRecovery, volumeName, bucketName, keyName); try { commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap); @@ -179,10 +187,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, commitKeyRequest.getClientID()); String dbOzoneKey = - omMetadataManager.getOzoneKey(volumeName, bucketName, - keyName); - String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName, - keyName, commitKeyRequest.getClientID()); + omMetadataManager.getOzoneKey(volumeName, bucketName, keyName); List locationInfoList = getOmKeyLocationInfos(ozoneManager, commitKeyArgs); @@ -225,9 +230,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Map oldKeyVersionsToDeleteMap = null; OmKeyInfo keyToDelete = omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey); + long writerClientId = commitKeyRequest.getClientID(); + if (isRecovery && keyToDelete != null) { + String clientId = keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID); + if (clientId == null) { + throw new OMException("Failed to recovery key, as " + + dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED); + } + writerClientId = Long.parseLong(clientId); + } + + final String clientIdString = String.valueOf(writerClientId); if (null != keyToDelete) { - final String clientIdString - = String.valueOf(commitKeyRequest.getClientID()); isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete) .map(WithMetadata::getMetadata) .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID)) @@ -235,21 +249,30 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, .isPresent(); } + String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName, + keyName, writerClientId); omKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey); if (omKeyInfo == null) { - String action = "commit"; - if (isHSync) { - action = "hsync"; - } + String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit"; throw new OMException("Failed to " + action + " key, as " + dbOpenKey + "entry is not found in the OpenKey table", KEY_NOT_FOUND); } + if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) && + omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) { + if (!isRecovery) { + throw new OMException("Cannot commit key " + dbOpenKey + " with " + OzoneConsts.LEASE_RECOVERY + + " metadata while recovery flag is not set in request", KEY_UNDER_LEASE_RECOVERY); + } + } omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf( commitKeyArgs.getMetadataList())); + if (isHSync) { - omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, - String.valueOf(commitKeyRequest.getClientID())); + omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, clientIdString); + } else if (isRecovery) { + omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID); + omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY); } omKeyInfo.setDataSize(commitKeyArgs.getDataSize()); omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime()); @@ -314,7 +337,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // Add to cache of open key table and key table. if (!isHSync) { - // If isHSync = false, put a tombstone in OpenKeyTable cache, + // If !isHSync = true, put a tombstone in OpenKeyTable cache, // indicating the key is removed from OpenKeyTable. // So that this key can't be committed again. omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry( @@ -350,12 +373,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, } // Debug logging for any key commit operation, successful or not - LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}", - result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo); + LOG.debug("Key commit {} with isHSync = {}, isRecovery = {}, omKeyInfo = {}", + result == Result.SUCCESS ? "succeeded" : "failed", isHSync, isRecovery, omKeyInfo); if (!isHSync) { auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap, - exception, getOmRequest().getUserInfo())); + exception, getOmRequest().getUserInfo())); processResult(commitKeyRequest, volumeName, bucketName, keyName, omMetrics, exception, omKeyInfo, result); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java index a4b0b9fa0bb7..5ffa7b87fb02 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java @@ -57,7 +57,9 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; /** @@ -102,17 +104,20 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, OMClientResponse omClientResponse = null; boolean bucketLockAcquired = false; Result result; - boolean isHSync = commitKeyRequest.hasHsync() && - commitKeyRequest.getHsync(); - + boolean isHSync = commitKeyRequest.hasHsync() && commitKeyRequest.getHsync(); + boolean isRecovery = commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery(); + // isHsync = true, a commit request as a result of client side hsync call + // isRecovery = true, a commit request as a result of client side recoverLease call + // none of isHsync and isRecovery is true, a commit request as a result of client side normal + // outputStream#close call. if (isHSync) { omMetrics.incNumKeyHSyncs(); } else { omMetrics.incNumKeyCommits(); } - LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}", - isHSync, volumeName, bucketName, keyName); + LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName = {}, keyName = {}", + isHSync, isRecovery, volumeName, bucketName, keyName); OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager(); @@ -126,7 +131,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, keyName, IAccessAuthorizer.ACLType.WRITE, commitKeyRequest.getClientID()); - Iterator pathComponents = Paths.get(keyName).iterator(); String dbOpenFileKey = null; @@ -150,29 +154,46 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, + " as parent directory doesn't exist"); String dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId, parentID, fileName); - dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId, - parentID, fileName, commitKeyRequest.getClientID()); + OmKeyInfo keyToDelete = + omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey); + long writerClientId = commitKeyRequest.getClientID(); + if (isRecovery && keyToDelete != null) { + String clientId = keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID); + if (clientId == null) { + throw new OMException("Failed to recovery key, as " + + dbFileKey + " is already closed", KEY_ALREADY_CLOSED); + } + writerClientId = Long.parseLong(clientId); + } + dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId, + parentID, fileName, writerClientId); omKeyInfo = OMFileRequest.getOmKeyInfoFromFileTable(true, omMetadataManager, dbOpenFileKey, keyName); if (omKeyInfo == null) { - String action = "commit"; - if (isHSync) { - action = "hsync"; - } + String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit"; throw new OMException("Failed to " + action + " key, as " + - dbOpenFileKey + "entry is not found in the OpenKey table", - KEY_NOT_FOUND); + dbOpenFileKey + " entry is not found in the OpenKey table", KEY_NOT_FOUND); + } + + if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) && + omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) { + if (!isRecovery) { + throw new OMException("Cannot commit key " + dbOpenFileKey + " with " + OzoneConsts.LEASE_RECOVERY + + " metadata while recovery flag is not set in request", KEY_UNDER_LEASE_RECOVERY); + } } + omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf( commitKeyArgs.getMetadataList())); if (isHSync) { - omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, - String.valueOf(commitKeyRequest.getClientID())); + omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, String.valueOf(writerClientId)); + } else if (isRecovery) { + omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID); + omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY); } omKeyInfo.setDataSize(commitKeyArgs.getDataSize()); - omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime()); List uncommitted = @@ -187,11 +208,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, // creation after the knob turned on. boolean isPreviousCommitHsync = false; Map oldKeyVersionsToDeleteMap = null; - OmKeyInfo keyToDelete = - omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey); if (null != keyToDelete) { - final String clientIdString - = String.valueOf(commitKeyRequest.getClientID()); + final String clientIdString = String.valueOf(writerClientId); isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete) .map(WithMetadata::getMetadata) .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID)) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java index ef73ab5c821e..fcefa473ff4d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java @@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.key.OmKeyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; + .OMResponse; import javax.annotation.Nonnull; import java.io.IOException; @@ -39,16 +39,14 @@ @CleanupTableInfo(cleanupTables = {FILE_TABLE, OPEN_FILE_TABLE}) public class OMRecoverLeaseResponse extends OmKeyResponse { - private OmKeyInfo keyInfo; - private String dbFileKey; + private OmKeyInfo openKeyInfo; private String openKeyName; + public OMRecoverLeaseResponse(@Nonnull OMResponse omResponse, - BucketLayout bucketLayout, OmKeyInfo keyInfo, String dbFileKey, - String openKeyName) { + BucketLayout bucketLayout, String openKeyName, OmKeyInfo openKeyInfo) { super(omResponse, bucketLayout); - this.keyInfo = keyInfo; - this.dbFileKey = dbFileKey; this.openKeyName = openKeyName; + this.openKeyInfo = openKeyInfo; } /** @@ -64,12 +62,11 @@ public OMRecoverLeaseResponse(@Nonnull OMResponse omResponse, @Override protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { - // Delete from OpenKey table + // Update OpenKey table if (openKeyName != null) { - omMetadataManager.getOpenKeyTable(getBucketLayout()).deleteWithBatch( - batchOperation, openKeyName); - omMetadataManager.getKeyTable(getBucketLayout()) - .putWithBatch(batchOperation, dbFileKey, keyInfo); + // In INIT stage, update the keyInfo in openKeyTable + omMetadataManager.getOpenKeyTable(getBucketLayout()).putWithBatch( + batchOperation, openKeyName, openKeyInfo); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java index bc41b6aa0c8f..c4f90958c758 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java @@ -84,7 +84,7 @@ public void addToDBBatch(OMMetadataManager omMetadataManager, // Delete from OpenKey table if (!isHSync()) { omMetadataManager.getOpenKeyTable(getBucketLayout()) - .deleteWithBatch(batchOperation, openKeyName); + .deleteWithBatch(batchOperation, openKeyName); } omMetadataManager.getKeyTable(getBucketLayout()) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java index ff267f7c07a4..6073632e5520 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java @@ -77,18 +77,18 @@ public void addToDBBatch(OMMetadataManager omMetadataManager, // Delete from OpenKey table if commit if (!this.isHSync()) { omMetadataManager.getOpenKeyTable(getBucketLayout()) - .deleteWithBatch(batchOperation, getOpenKeyName()); + .deleteWithBatch(batchOperation, getOpenKeyName()); } OMFileRequest.addToFileTable(omMetadataManager, batchOperation, - getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID()); + getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID()); updateDeletedTable(omMetadataManager, batchOperation); // update bucket usedBytes. omMetadataManager.getBucketTable().putWithBatch(batchOperation, - omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(), - getOmBucketInfo().getBucketName()), getOmBucketInfo()); + omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(), + getOmBucketInfo().getBucketName()), getOmBucketInfo()); } @Override diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java index 45209258f7fe..f52af51919be 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java @@ -1571,7 +1571,7 @@ public static long addParentsToDirTable(String volumeName, String bucketName, OMRequestTestUtils.createOmDirectoryInfo(pathElement, ++objectId, parentId); OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo, - volumeName, bucketName, txnID, omMetaMgr); + volumeName, bucketName, ++txnID, omMetaMgr); parentId = omDirInfo.getObjectID(); } return parentId; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java index 5c10c0821d9a..476e2b2f935f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java @@ -18,39 +18,45 @@ package org.apache.hadoop.ozone.om.request.file; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequestWithFSO; +import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO; import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .KeyLocation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .RecoverLeaseRequest; -import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; -import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest; import org.apache.hadoop.util.Time; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB.setReplicationConfig; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; /** * Tests OMRecoverLeaseRequest. @@ -71,34 +77,190 @@ public BucketLayout getBucketLayout() { */ @Test public void testRecoverHsyncFile() throws Exception { - when(ozoneManager.getAclsEnabled()).thenReturn(true); - when(ozoneManager.getVolumeOwner( - anyString(), - any(IAccessAuthorizer.ACLType.class), any( - OzoneObj.ResourceType.class))) - .thenReturn("user"); - InetSocketAddress address = new InetSocketAddress("localhost", 10000); - when(ozoneManager.getOmRpcServerAddr()).thenReturn(address); - populateNamespace(true, true); + populateNamespace(true, true, true, true); OMClientResponse omClientResponse = validateAndUpdateCache(); + OMResponse omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + RecoverLeaseResponse recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo); + OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo); - Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, + omClientResponse = validateAndUpdateCacheForCommit(getNewKeyArgs(omKeyInfo, 0)); + omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + + verifyTables(true, false); + } + + /** + * Verify that RecoverLease request is idempotent. + * @throws Exception + */ + @Test + public void testInitStageIdempotent() throws Exception { + populateNamespace(true, true, true, true); + + // call recovery first time + OMClientResponse omClientResponse = validateAndUpdateCache(); + OMResponse omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + RecoverLeaseResponse recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo1 = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo1); + + // call recovery second time + omClientResponse = validateAndUpdateCache(); + omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo2 = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo2); + Assertions.assertEquals(keyInfo1.getKeyName(), keyInfo2.getKeyName()); + } + + /** + * Verify that COMMIT request for recovery is not idempotent. + * @throws Exception + */ + @Test + public void testCommitStageNotIdempotent() throws Exception { + populateNamespace(true, true, true, true); + + // call recovery + OMClientResponse omClientResponse = validateAndUpdateCache(); + OMResponse omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + RecoverLeaseResponse recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo); + OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo); + + KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0); + + // call commit first time + omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs); + omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + + // call commit second time + omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs); + omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_ALREADY_CLOSED, omResponse.getStatus()); + } + + /** + * Verify that RecoverLease COMMIT request has a new file length. + * @throws Exception + */ + @Test + public void testRecoverWithNewFileLength() throws Exception { + populateNamespace(true, true, true, true); + + // call recovery + OMClientResponse omClientResponse = validateAndUpdateCache(); + OMResponse omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + RecoverLeaseResponse recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo); + OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo); + + // call commit + long deltaLength = 100; + KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, deltaLength); + omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs); + omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + + // get file length and check the length is as expected + String ozoneKey = getFileName(); + OmKeyInfo omKeyInfoFetched = omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey); + Assertions.assertEquals(omKeyInfo.getDataSize(), omKeyInfoFetched.getDataSize()); + + // check the final block length is as expected + List locationInfoListFetched = + omKeyInfoFetched.getLatestVersionLocations().getBlocksLatestVersionOnly(); + List omKeyLocationInfos = omKeyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); + Assertions.assertEquals(omKeyLocationInfos.get(omKeyLocationInfos.size() - 1).getLength(), + locationInfoListFetched.get(locationInfoListFetched.size() - 1).getLength()); + + // check the committed file doesn't have HSYNC_CLIENT_ID and LEASE_RECOVERY metadata + Assertions.assertNull(omKeyInfoFetched.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID)); + Assertions.assertNull(omKeyInfoFetched.getMetadata().get(OzoneConsts.LEASE_RECOVERY)); + } + + /** + * Verify that RecoverLease COMMIT request has a new client ID. + * @throws Exception + */ + @Test + public void testRecoverWithNewClientID() throws Exception { + populateNamespace(true, true, true, true); + + // call recovery + OMClientResponse omClientResponse = validateAndUpdateCache(); + OMResponse omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + RecoverLeaseResponse recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo); + OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo); + + // call commit + KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0); + omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs, true, true); + omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + } + + /** + * Verify that an under recovery file will reject allocate block and further hsync call(commit). + * @throws Exception + */ + @Test + public void testRejectAllocateBlockAndHsync() throws Exception { + populateNamespace(true, true, true, true); + + // call recovery + OMClientResponse omClientResponse = validateAndUpdateCache(); + OMResponse omResponse = omClientResponse.getOMResponse(); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, omResponse.getStatus()); + RecoverLeaseResponse recoverLeaseResponse = omResponse.getRecoverLeaseResponse(); + KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo(); + Assertions.assertNotNull(keyInfo); + OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo); + + // call allocate block + OMRequest request = createAllocateBlockRequest(volumeName, bucketName, keyName); + OMAllocateBlockRequestWithFSO omAllocateBlockRequest = + new OMAllocateBlockRequestWithFSO(request, getBucketLayout()); + request = omAllocateBlockRequest.preExecute(ozoneManager); + assertNotNull(request.getUserInfo()); + omAllocateBlockRequest = new OMAllocateBlockRequestWithFSO(request, getBucketLayout()); + omClientResponse = omAllocateBlockRequest.validateAndUpdateCache( + ozoneManager, 100L, ozoneManagerDoubleBufferHelper); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_UNDER_LEASE_RECOVERY, omClientResponse.getOMResponse().getStatus()); - verifyTables(true, true); + // call commit(hsync calls commit) + KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0); + omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs, false, false); + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_UNDER_LEASE_RECOVERY, + omClientResponse.getOMResponse().getStatus()); } /** - * verify that recover a closed file should be allowed (essentially no-op). - */ + * verify that recover a closed file. + **/ @Test public void testRecoverClosedFile() throws Exception { - populateNamespace(true, false); + populateNamespace(true, false, false, false); OMClientResponse omClientResponse = validateAndUpdateCache(); - Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, + Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_ALREADY_CLOSED, omClientResponse.getOMResponse().getStatus()); verifyTables(true, false); @@ -109,7 +271,7 @@ public void testRecoverClosedFile() throws Exception { */ @Test public void testRecoverOpenFile() throws Exception { - populateNamespace(false, true); + populateNamespace(false, false, true, false); OMClientResponse omClientResponse = validateAndUpdateCache(); @@ -125,7 +287,7 @@ public void testRecoverOpenFile() throws Exception { */ @Test public void testRecoverAbsentFile() throws Exception { - populateNamespace(false, false); + populateNamespace(false, false, false, false); OMClientResponse omClientResponse = validateAndUpdateCache(); @@ -135,8 +297,37 @@ public void testRecoverAbsentFile() throws Exception { verifyTables(false, false); } - private void populateNamespace(boolean addKeyTable, boolean addOpenKeyTable) - throws Exception { + private KeyArgs getNewKeyArgs(OmKeyInfo omKeyInfo, long deltaLength) throws IOException { + OmKeyLocationInfoGroup omKeyLocationInfoGroup = omKeyInfo.getLatestVersionLocations(); + List omKeyLocationInfoList = omKeyLocationInfoGroup.getBlocksLatestVersionOnly(); + long lastBlockLength = omKeyLocationInfoList.get(omKeyLocationInfoList.size() - 1).getLength(); + omKeyLocationInfoList.get(omKeyLocationInfoList.size() - 1).setLength(lastBlockLength + deltaLength); + + long fileLength = omKeyLocationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum(); + omKeyInfo.setDataSize(fileLength); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(omKeyInfo.getVolumeName()) + .setBucketName(omKeyInfo.getBucketName()).setKeyName(omKeyInfo.getKeyName()) + .setReplicationConfig(omKeyInfo.getReplicationConfig()).setDataSize(fileLength) + .setLocationInfoList(omKeyLocationInfoList).setLatestVersionLocation(true) + .build(); + + List locationInfoList = keyArgs.getLocationInfoList(); + Preconditions.checkNotNull(locationInfoList); + KeyArgs.Builder keyArgsBuilder = KeyArgs.newBuilder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setDataSize(keyArgs.getDataSize()) + .addAllMetadata(KeyValueUtil.toProtobuf(keyArgs.getMetadata())) + .addAllKeyLocations(locationInfoList.stream() + .map(info -> info.getProtobuf(ClientVersion.CURRENT_VERSION)) + .collect(Collectors.toList())); + setReplicationConfig(keyArgs.getReplicationConfig(), keyArgsBuilder); + return keyArgsBuilder.build(); + } + + private void populateNamespace(boolean addKeyTable, boolean keyInfoWithHsyncFlag, + boolean addOpenKeyTable, boolean openKeyInfoWithHsyncFlag) throws Exception { String parentDir = "c/d/e"; String fileName = "f"; keyName = parentDir + "/" + fileName; @@ -151,14 +342,14 @@ private void populateNamespace(boolean addKeyTable, boolean addOpenKeyTable) OmKeyInfo omKeyInfo; if (addKeyTable) { - String ozoneKey = addToFileTable(allocatedLocationList); + String ozoneKey = addToFileTable(allocatedLocationList, keyInfoWithHsyncFlag); omKeyInfo = omMetadataManager.getKeyTable(getBucketLayout()) .get(ozoneKey); assertNotNull(omKeyInfo); } if (addOpenKeyTable) { - String openKey = addToOpenFileTable(allocatedLocationList); + String openKey = addToOpenFileTable(allocatedLocationList, openKeyInfoWithHsyncFlag); omKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout()) .get(openKey); @@ -166,22 +357,33 @@ private void populateNamespace(boolean addKeyTable, boolean addOpenKeyTable) } } + protected OMRequest createAllocateBlockRequest(String volumeName, String bucketName, String keyName) { + KeyArgs keyArgs = KeyArgs.newBuilder() + .setVolumeName(volumeName).setBucketName(bucketName) + .setKeyName(keyName) + .setFactor(replicationFactor).setType(replicationType) + .build(); + + AllocateBlockRequest allocateBlockRequest = + AllocateBlockRequest.newBuilder().setClientID(clientID).setKeyArgs(keyArgs).build(); + + return OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock) + .setClientId(UUID.randomUUID().toString()) + .setAllocateBlockRequest(allocateBlockRequest).build(); + } + @NotNull protected OMRequest createRecoverLeaseRequest( String volumeName, String bucketName, String keyName) { - - RecoverLeaseRequest recoverLeaseRequest = RecoverLeaseRequest.newBuilder() - .setVolumeName(volumeName) - .setBucketName(bucketName) - .setKeyName(keyName).build(); - + RecoverLeaseRequest.Builder rb = RecoverLeaseRequest.newBuilder(); + rb.setVolumeName(volumeName).setBucketName(bucketName).setKeyName(keyName); return OMRequest.newBuilder() .setCmdType(OzoneManagerProtocolProtos.Type.RecoverLease) .setClientId(UUID.randomUUID().toString()) - .setRecoverLeaseRequest(recoverLeaseRequest).build(); + .setRecoverLeaseRequest(rb.build()).build(); } - private OMClientResponse validateAndUpdateCache() throws Exception { OMRequest modifiedOmRequest = doPreExecute(createRecoverLeaseRequest( volumeName, bucketName, keyName)); @@ -196,6 +398,34 @@ private OMClientResponse validateAndUpdateCache() throws Exception { return omClientResponse; } + @NotNull + protected OMRequest createKeyCommitRequest(KeyArgs keyArgs, boolean newClientID, boolean recovery) { + CommitKeyRequest.Builder rb = + CommitKeyRequest.newBuilder().setKeyArgs(keyArgs).setRecovery(recovery); + rb.setClientID(newClientID ? clientID + 1 : clientID); + return OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.CommitKey) + .setClientId(UUID.randomUUID().toString()) + .setCommitKeyRequest(rb.build()).build(); + } + + private OMClientResponse validateAndUpdateCacheForCommit(KeyArgs keyArgs) throws Exception { + return validateAndUpdateCacheForCommit(keyArgs, false, true); + } + + private OMClientResponse validateAndUpdateCacheForCommit(KeyArgs keyArgs, boolean newClientID, + boolean recovery) throws Exception { + OMRequest omRequest = createKeyCommitRequest(keyArgs, newClientID, recovery); + OMKeyCommitRequestWithFSO omKeyCommitRequest = new OMKeyCommitRequestWithFSO(omRequest, getBucketLayout()); + OMRequest modifiedOmRequest = omKeyCommitRequest.preExecute(ozoneManager); + assertNotNull(modifiedOmRequest.getUserInfo()); + + omKeyCommitRequest = new OMKeyCommitRequestWithFSO(modifiedOmRequest, getBucketLayout()); + OMClientResponse omClientResponse = + omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L, ozoneManagerDoubleBufferHelper); + return omClientResponse; + } + private void verifyTables(boolean hasKey, boolean hasOpenKey) throws IOException { // Now entry should be created in key Table. @@ -238,8 +468,7 @@ String getFileName() throws IOException { fileName); } - protected OMRecoverLeaseRequest getOmRecoverLeaseRequest( - OMRequest omRequest) { + protected OMRecoverLeaseRequest getOmRecoverLeaseRequest(OMRequest omRequest) { return new OMRecoverLeaseRequest(omRequest); } @@ -269,14 +498,16 @@ private OMRequest doPreExecute(OMRequest originalOMRequest) throws Exception { return modifiedOmRequest; } - String addToOpenFileTable(List locationList) + String addToOpenFileTable(List locationList, boolean hsyncFlag) throws Exception { OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, keyName, replicationType, replicationFactor, 0, parentId, 0, Time.now(), version); omKeyInfo.appendNewBlocks(locationList, false); - omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, - String.valueOf(clientID)); + if (hsyncFlag) { + omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, + String.valueOf(clientID)); + } OMRequestTestUtils.addFileToKeyTable( true, false, omKeyInfo.getFileName(), @@ -291,12 +522,16 @@ String addToOpenFileTable(List locationList) omKeyInfo.getParentObjectID(), omKeyInfo.getFileName(), clientID); } - String addToFileTable(List locationList) + String addToFileTable(List locationList, boolean hsyncFlag) throws Exception { OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName, bucketName, keyName, replicationType, replicationFactor, 0, parentId, 0, Time.now(), version); omKeyInfo.appendNewBlocks(locationList, false); + if (hsyncFlag) { + omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, + String.valueOf(clientID)); + } OMRequestTestUtils.addFileToKeyTable( false, false, omKeyInfo.getFileName(), diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java index a72756d01087..40fccb8fda64 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java @@ -161,6 +161,6 @@ protected OMKeyCommitResponse getOmKeyCommitResponse(OmKeyInfo omKeyInfo, new RepeatedOmKeyInfo(e))); } return new OMKeyCommitResponse(omResponse, omKeyInfo, ozoneKey, openKey, - omBucketInfo, deleteKeyMap, isHSync); + omBucketInfo, deleteKeyMap, isHSync); } } diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index a60134d4f649..e510d9841be4 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; @@ -733,13 +734,20 @@ private SnapshotDiffReportOzone getSnapshotDiffReportOnceComplete( } @Override - public boolean recoverLease(final String pathStr) throws IOException { - incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1); + public List recoverFilePrepare(final String pathStr) throws IOException { + incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1); return ozoneClient.getProxy().getOzoneManagerClient().recoverLease( volume.getName(), bucket.getName(), pathStr); } + @Override + public void recoverFile(OmKeyArgs keyArgs) throws IOException { + incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1); + + ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L); + } + @Override public void setTimes(String key, long mtime, long atime) throws IOException { incrementCounter(Statistic.INVOCATION_SET_TIMES, 1); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java index 085a2300fd41..567004bc81ca 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java @@ -70,6 +70,7 @@ import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; @@ -1403,8 +1404,8 @@ public boolean isFileClosed(String pathStr) throws IOException { } @Override - public boolean recoverLease(final String pathStr) throws IOException { - incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1); + public List recoverFilePrepare(final String pathStr) throws IOException { + incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1); OFSPath ofsPath = new OFSPath(pathStr, config); OzoneVolume volume = objectStore.getVolume(ofsPath.getVolumeName()); @@ -1413,6 +1414,13 @@ public boolean recoverLease(final String pathStr) throws IOException { volume.getName(), bucket.getName(), ofsPath.getKeyName()); } + @Override + public void recoverFile(OmKeyArgs keyArgs) throws IOException { + incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1); + + ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L); + } + @Override public void setTimes(String key, long mtime, long atime) throws IOException { incrementCounter(Statistic.INVOCATION_SET_TIMES, 1); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java index c48f1a6366fe..dbce02dd5665 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeModeAction; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -95,7 +97,9 @@ SnapshotDiffReport getSnapshotDiffReport(Path snapshotDir, String fromSnapshot, String toSnapshot) throws IOException, InterruptedException; - boolean recoverLease(String pathStr) throws IOException; + List recoverFilePrepare(String pathStr) throws IOException; + + void recoverFile(OmKeyArgs keyArgs) throws IOException; void setTimes(String key, long mtime, long atime) throws IOException; diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java index aae71e9c4cde..10abc570918b 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java @@ -77,8 +77,9 @@ public enum Statistic { "Calls of setTimes()"), INVOCATION_IS_FILE_CLOSED("op_is_file_closed", "Calls of isFileClosed()"), - INVOCATION_RECOVER_LEASE("op_recover_lease", - "Calls of recoverLease()"), + INVOCATION_RECOVER_FILE_PREPARE("op_recover_file_prepare", + "Calls of recoverFilePrepare()"), + INVOCATION_RECOVER_FILE("op_recover_file", "Calls of recoverFile()"), INVOCATION_SET_SAFE_MODE("op_set_safe_mode", "Calls of setSafeMode()"); diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 203e7ed373eb..8f977167a2d3 100644 --- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.List; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; @@ -33,6 +34,8 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; /** @@ -129,7 +132,16 @@ public boolean recoverLease(Path f) throws IOException { LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - return getAdapter().recoverLease(key); + List infoList = getAdapter().recoverFilePrepare(key); + // TODO: query DN to get the final block length + OmKeyInfo keyInfo = infoList.get(0); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) + .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) + .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .build(); + getAdapter().recoverFile(keyArgs); + return true; } @Override diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index 9b1596c05b9e..0530b606b299 100644 --- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -29,11 +29,14 @@ import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.List; /** * The Rooted Ozone Filesystem (OFS) implementation. @@ -131,17 +134,24 @@ public boolean hasPathCapability(final Path path, final String capability) */ @Override public boolean recoverLease(final Path f) throws IOException { - incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1); statistics.incrementWriteOps(1); LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - return getAdapter().recoverLease(key); + List infoList = getAdapter().recoverFilePrepare(key); + // TODO: query DN to get the final block length + OmKeyInfo keyInfo = infoList.get(0); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) + .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) + .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .build(); + getAdapter().recoverFile(keyArgs); + return true; } @Override public boolean isFileClosed(Path f) throws IOException { - incrementCounter(Statistic.INVOCATION_IS_FILE_CLOSED, 1); statistics.incrementReadOps(1); LOG.trace("isFileClosed() path:{}", f); Path qualifiedPath = makeQualified(f); diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 71f01e441427..a31a07cf6763 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.List; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; @@ -33,6 +34,8 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; /** @@ -129,7 +132,16 @@ public boolean recoverLease(Path f) throws IOException { LOG.trace("isFileClosed() path:{}", f); Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - return getAdapter().recoverLease(key); + List infoList = getAdapter().recoverFilePrepare(key); + // TODO: query DN to get the final block length + OmKeyInfo keyInfo = infoList.get(0); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) + .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) + .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .build(); + getAdapter().recoverFile(keyArgs); + return true; } @Override diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index 7561e20a875d..ed62f608b4ae 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -29,11 +29,14 @@ import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.List; /** * The Rooted Ozone Filesystem (OFS) implementation. @@ -128,7 +131,16 @@ public boolean recoverLease(final Path f) throws IOException { LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - return getAdapter().recoverLease(key); + List infoList = getAdapter().recoverFilePrepare(key); + // TODO: query DN to get the final block length + OmKeyInfo keyInfo = infoList.get(0); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) + .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) + .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .build(); + getAdapter().recoverFile(keyArgs); + return true; } @Override