Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ private OzoneConsts() {

/** Metadata stored in OmKeyInfo. */
public static final String HSYNC_CLIENT_ID = "hsyncClientId";
public static final String LEASE_RECOVERY = "leaseRecovery";

//GDPR
public static final String GDPR_FLAG = "gdprEnabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public enum ResultCodes {

S3_SECRET_ALREADY_EXISTS,

INVALID_PATH
INVALID_PATH,
KEY_UNDER_LEASE_RECOVERY,
KEY_ALREADY_CLOSED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;

import javax.annotation.Nonnull;

import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.OzoneAcl;
Expand Down Expand Up @@ -269,6 +270,19 @@ default void hsyncKey(OmKeyArgs args, long clientID)
"this to be implemented, as write requests use a new approach.");
}

/**
* Recovery and commit a key. This will make the change from the client visible. The client
* is identified by the clientID.
*
* @param args the key to commit
* @param clientID the client identification
* @throws IOException
*/
default void recoverKey(OmKeyArgs args, long clientID)
throws IOException {
throw new UnsupportedOperationException("OzoneManager does not require " +
"this to be implemented, as write requests use a new approach.");
}

/**
* Allocate a new block, it is assumed that the client is having an open key
Expand Down Expand Up @@ -1086,11 +1100,10 @@ EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
* @param volumeName - The volume name.
* @param bucketName - The bucket name.
* @param keyName - The key user want to recover.
* @return true if the file is already closed
* @return OmKeyInfo KeyInfo is file under recovery
* @throws IOException if an error occurs
*/
boolean recoverLease(String volumeName, String bucketName,
String keyName) throws IOException;
List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String keyName) throws IOException;

/**
* Update modification time and access time of a file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,13 +777,19 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId,
@Override
public void hsyncKey(OmKeyArgs args, long clientId)
throws IOException {
updateKey(args, clientId, true);
updateKey(args, clientId, true, false);
}

@Override
public void commitKey(OmKeyArgs args, long clientId)
throws IOException {
updateKey(args, clientId, false);
updateKey(args, clientId, false, false);
}

@Override
public void recoverKey(OmKeyArgs args, long clientId)
throws IOException {
updateKey(args, clientId, false, true);
}

public static void setReplicationConfig(ReplicationConfig replication,
Expand All @@ -799,7 +805,7 @@ public static void setReplicationConfig(ReplicationConfig replication,
b.setType(replication.getReplicationType());
}

private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
private void updateKey(OmKeyArgs args, long clientId, boolean hsync, boolean recovery)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
Expand All @@ -820,15 +826,13 @@ private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
req.setKeyArgs(keyArgsBuilder.build());
req.setClientID(clientId);
req.setHsync(hsync);

req.setRecovery(recovery);

OMRequest omRequest = createOMRequest(Type.CommitKey)
.setCommitKeyRequest(req)
.build();

handleError(submitRequest(omRequest));


}

@Override
Expand Down Expand Up @@ -2440,21 +2444,23 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
}

@Override
public boolean recoverLease(String volumeName, String bucketName,
String keyName) throws IOException {
public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String keyName) throws IOException {
RecoverLeaseRequest recoverLeaseRequest =
RecoverLeaseRequest.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.build();
RecoverLeaseRequest.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.build();

OMRequest omRequest = createOMRequest(Type.RecoverLease)
.setRecoverLeaseRequest(recoverLeaseRequest).build();
.setRecoverLeaseRequest(recoverLeaseRequest).build();

RecoverLeaseResponse recoverLeaseResponse =
handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
return recoverLeaseResponse.getResponse();
handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
ArrayList<OmKeyInfo> list = new ArrayList();
list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()));
list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the second element is not used by callers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jojochuang , openKeyInfo is return for client. There is one case, suppose a hsynced file, a new block is allocated to it, then client writes some data to this new block, and crashes before it calls hsync for data on this new block, then the openKeyInfo will have one more block than keyInfo. In this case, If we want to recover the last new block length, then we need the openKeyInfo info. If we only recover the last block that hsynced ever called, then keyInfo is enough. The question is, what's expectation from user? Does recovering the last hsynced block is user's expectation?

return list;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ Create volume bucket and put key
*** Test Cases ***
Test ozone debug recover for o3fs
${result} = Execute Lease recovery cli o3fs://${BUCKET}.${VOLUME}.om/${TESTFILE}
Should Contain ${result} Lease recovery SUCCEEDED
Should Contain ${result} Key: ${TESTFILE} is already closed
${result} = Execute Lease recovery cli o3fs://${BUCKET}.${VOLUME}.om/randomfile
Should Contain ${result} not found

Test ozone debug recover for ofs
${result} = Execute Lease recovery cli ofs://om/${VOLUME}/${BUCKET}/${TESTFILE}
Should Contain ${result} Lease recovery SUCCEEDED
Should Contain ${result} Key: ${TESTFILE} is already closed
${result} = Execute Lease recovery cli ofs://om/${VOLUME}/${BUCKET}/randomfile
Should Contain ${result} not found
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ public void testRecovery() throws Exception {
Thread.sleep(1000);
}
// The lease should have been recovered.
assertTrue("File should be closed", fs.recoverLease(file));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does recoverLease() throw except if the file is already closed? If so, it would be a deviation from HDFS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the second call of recoverLease will fail if the first has succeeded, since the file is already committed so checks in OM side will fail. Should we keep the same behavior as HDFS? I remember @szetszwo has mentioned a case that if after a Hbase region server fails, two new Hbase region servers start on the server, if the second region server calls recoverLease and gets a successful result, then there could be two region servers started and running on the same server. Not sure if Hbase has other checks to prevent the second region server to start so that the two region servers running altogether will not happen.

assertTrue(fs.isFileClosed(file));
assertTrue("File should be closed", fs.isFileClosed(file));
} finally {
closeIgnoringKeyNotFound(stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ enum Status {
S3_SECRET_ALREADY_EXISTS = 92;

INVALID_PATH = 93;
KEY_UNDER_LEASE_RECOVERY = 94;
KEY_ALREADY_CLOSED = 95;
}

/**
Expand Down Expand Up @@ -1413,6 +1415,7 @@ message CommitKeyRequest {
required KeyArgs keyArgs = 1;
required uint64 clientID = 2;
optional bool hsync = 3;
optional bool recovery = 4;
}

message CommitKeyResponse {
Expand Down Expand Up @@ -2070,7 +2073,8 @@ message RecoverLeaseRequest {
}

message RecoverLeaseResponse {
optional bool response = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially incompatible. But given that the feature is disabled by default, I agree this is acceptable.

optional KeyInfo keyInfo = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyInfo is required or just openKeyInfo would suffice here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to one of above answer to Wei-Chiu's comment.

optional KeyInfo openKeyInfo = 2;
}

message SetTimesRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4603,9 +4603,8 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
}

@Override
public boolean recoverLease(String volumeName, String bucketName,
String keyName) {
return false;
public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String keyName) {
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.google.common.base.Preconditions;

import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
Expand All @@ -34,14 +36,11 @@
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.file.OMRecoverLeaseResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RecoverLeaseRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RecoverLeaseResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease;

import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
Expand All @@ -58,10 +57,9 @@
import java.util.LinkedHashMap;
import java.util.Map;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Type.RecoverLease;

/**
* Perform actions for RecoverLease requests.
Expand All @@ -75,6 +73,8 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
private String keyName;
private OmKeyInfo keyInfo;
private String dbFileKey;
private OmKeyInfo openKeyInfo;
private String dbOpenFileKey;

private OMMetadataManager omMetadataManager;

Expand Down Expand Up @@ -141,28 +141,21 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
acquiredLock = getOmLockDetails().isLockAcquired();
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);

String openKeyEntryName = doWork(ozoneManager, transactionLogIndex);
RecoverLeaseResponse recoverLeaseResponse = doWork(ozoneManager, transactionLogIndex);

// Prepare response
boolean responseCode = true;
omResponse
.setRecoverLeaseResponse(
RecoverLeaseResponse.newBuilder()
.setResponse(responseCode)
.build())
.setCmdType(RecoverLease);
omClientResponse =
new OMRecoverLeaseResponse(omResponse.build(), getBucketLayout(),
keyInfo, dbFileKey, openKeyEntryName);
omResponse.setRecoverLeaseResponse(recoverLeaseResponse).setCmdType(RecoverLease);
omClientResponse = new OMRecoverLeaseResponse(omResponse.build(), getBucketLayout(),
dbOpenFileKey, openKeyInfo);
omMetrics.incNumRecoverLease();
LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}", volumeName,
bucketName, keyName);
LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}",
volumeName, bucketName, keyName);
} catch (IOException | InvalidPathException ex) {
LOG.error("Fail for recovering lease. Volume:{}, Bucket:{}, Key:{}",
volumeName, bucketName, keyName, ex);
exception = ex;
omMetrics.incNumRecoverLeaseFails();
omResponse.setCmdType(RecoverLease);
omClientResponse = new OMRecoverLeaseResponse(
createErrorOMResponse(omResponse, exception), getBucketLayout());
} finally {
Expand All @@ -186,60 +179,56 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
return omClientResponse;
}

private String doWork(OzoneManager ozoneManager, long transactionLogIndex)
throws IOException {

private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better to have restriction on some time gap between recoverLease and latest hsync call. Or else here recoverLease can be called immediately after hsync call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the patch is quite big now, the support of soft and hard limit is not implemented in this patch. We can file new JIRA to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense.

long transactionLogIndex) throws IOException {
final long volumeId = omMetadataManager.getVolumeId(volumeName);
final long bucketId = omMetadataManager.getBucketId(
volumeName, bucketName);
final long bucketId = omMetadataManager.getBucketId(volumeName, bucketName);
Iterator<Path> pathComponents = Paths.get(keyName).iterator();
long parentID = OMFileRequest.getParentID(volumeId, bucketId,
pathComponents, keyName, omMetadataManager,
"Cannot recover file : " + keyName
+ " as parent directory doesn't exist");
String fileName = OzoneFSUtils.getFileName(keyName);
dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
parentID, fileName);
dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId, parentID, fileName);

keyInfo = getKey(dbFileKey);
if (keyInfo == null) {
throw new OMException("Key:" + keyName + " not found", KEY_NOT_FOUND);
throw new OMException("Key:" + keyName + " not found in keyTable", KEY_NOT_FOUND);
}
final String clientId = keyInfo.getMetadata().remove(
OzoneConsts.HSYNC_CLIENT_ID);
if (clientId == null) {

final String writerId = keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
if (writerId == null) {
// if file is closed, do nothing and return right away.
LOG.warn("Key:" + keyName + " is already closed");
return null;
throw new OMException("Key: " + keyName + " is already closed", KEY_ALREADY_CLOSED);
}
String openFileDBKey = omMetadataManager.getOpenFileName(
volumeId, bucketId, parentID, fileName, Long.parseLong(clientId));
if (openFileDBKey != null) {
commitKey(dbFileKey, keyInfo, fileName, ozoneManager,
transactionLogIndex);
removeOpenKey(openFileDBKey, fileName, transactionLogIndex);

dbOpenFileKey = omMetadataManager.getOpenFileName(
volumeId, bucketId, parentID, fileName, Long.parseLong(writerId));
openKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenFileKey);
if (openKeyInfo == null) {
throw new OMException("Open Key " + dbOpenFileKey + " not found in openKeyTable", KEY_NOT_FOUND);
}

if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
LOG.debug("Key: " + keyName + " is already under recovery");
} else {
openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true");
openKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled());
openKeyInfo.setModificationTime(Time.now());
// Add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, openKeyInfo));
Comment on lines +218 to +220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@smengcl smengcl Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e.

Suggested change
// Add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, openKeyInfo));
// Add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenFileKey, openKeyInfo, transactionLogIndex);

}
keyInfo.setKeyName(keyName);
openKeyInfo.setKeyName(keyName);
RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder()
.setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true))
.setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true));

return openFileDBKey;
return rb.build();
}

private OmKeyInfo getKey(String dbOzoneKey) throws IOException {
return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
}

private void commitKey(String dbOzoneKey, OmKeyInfo omKeyInfo,
String fileName, OzoneManager ozoneManager,
long transactionLogIndex) throws IOException {
omKeyInfo.setModificationTime(Time.now());
omKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled());

OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbOzoneKey,
omKeyInfo, fileName, transactionLogIndex);
}

private void removeOpenKey(String openKeyName, String fileName,
long transactionLogIndex) {
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
openKeyName, null, fileName, transactionLogIndex);
}
}
Loading