Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;

/**
* This class represents LeaseKeyInfo.
*/
public class LeaseKeyInfo {
private final OmKeyInfo keyInfo;
/**
* isKeyInfo = true indicates keyInfo is from keyTable.
* isKeyInfo = false indicates keyInfo is from openKeyTable.
*/
private boolean isKeyInfo;

public LeaseKeyInfo(OmKeyInfo info, boolean isKeyInfo) {
this.keyInfo = info;
this.isKeyInfo = isKeyInfo;
}

public boolean getIsKeyInfo() {
return this.isKeyInfo;
}

public OmKeyInfo getKeyInfo() {
return keyInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.ozone.om.helpers.DBUpdates;
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand Down Expand Up @@ -1112,10 +1113,10 @@ EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
* @param bucketName - The bucket name.
* @param keyName - The key user want to recover.
* @param force - force recover the file.
* @return OmKeyInfo KeyInfo of file under recovery
* @return LeaseKeyInfo KeyInfo of file under recovery
* @throws IOException if an error occurs
*/
OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) throws IOException;
LeaseKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) throws IOException;

/**
* Update modification time and access time of a file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.ozone.om.helpers.DeleteTenantState;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand Down Expand Up @@ -2476,7 +2477,7 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
}

@Override
public OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force)
public LeaseKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force)
throws IOException {
RecoverLeaseRequest recoverLeaseRequest =
RecoverLeaseRequest.newBuilder()
Expand All @@ -2492,7 +2493,8 @@ public OmKeyInfo recoverLease(String volumeName, String bucketName, String keyNa
RecoverLeaseResponse recoverLeaseResponse =
handleError(submitRequest(omRequest)).getRecoverLeaseResponse();

return OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo());
return new LeaseKeyInfo(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()),
recoverLeaseResponse.getIsKeyInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@

import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
Expand Down Expand Up @@ -190,6 +192,42 @@ public void testRecovery(int dataSize) throws Exception {
verifyData(data, dataSize * 2, file, fs);
}

@Test
public void testRecoveryWithoutHsyncHflushOnLastBlock() throws Exception {
RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);

int blockSize = (int) cluster.getOzoneManager().getConfiguration().getStorageSize(
OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);

final byte[] data = getData(blockSize / 2 + 1);

final FSDataOutputStream stream = fs.create(file, true);
try {
stream.write(data);
stream.hsync();
assertFalse(fs.isFileClosed(file));

// It will write into new block as well
// Don't do hsync/flush
stream.write(data);

int count = 0;
while (count++ < 15 && !fs.recoverLease(file)) {
Thread.sleep(1000);
}
// The lease should have been recovered.
assertTrue(fs.isFileClosed(file), "File should be closed");

// A second call to recoverLease should succeed too.
assertTrue(fs.recoverLease(file));
} finally {
closeIgnoringKeyNotFound(stream);
}

// open it again, make sure the data is correct
verifyData(data, blockSize / 2 + 1, file, fs);
}

@Test
public void testOBSRecoveryShouldFail() throws Exception {
// Set the fs.defaultFS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,7 @@ message RecoverLeaseRequest {
message RecoverLeaseResponse {
optional bool response = 1 [deprecated=true];
optional KeyInfo keyInfo = 2;
optional bool isKeyInfo = 3 [default = true];
}

message SetTimesRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneManagerVersion;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
Expand Down Expand Up @@ -4683,7 +4684,7 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
}

@Override
public OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) {
public LeaseKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,34 +248,44 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
List<OmKeyLocationInfo> openKeyLocationInfoList = openKeyLatestVersionLocations.getLocationList();

OmKeyLocationInfo finalBlock = null;
OmKeyLocationInfo penultimateBlock = null;
boolean returnKeyInfo = true;
if (openKeyLocationInfoList.size() > keyLocationInfoList.size() &&
openKeyModificationTime > keyInfo.getModificationTime() &&
openKeyLocationInfoList.size() > 0) {
finalBlock = openKeyLocationInfoList.get(openKeyLocationInfoList.size() - 1);
if (openKeyLocationInfoList.size() > 1) {
penultimateBlock = openKeyLocationInfoList.get(openKeyLocationInfoList.size() - 2);
}
returnKeyInfo = false;
} else if (keyLocationInfoList.size() > 0) {
finalBlock = keyLocationInfoList.get(keyLocationInfoList.size() - 1);
}
if (finalBlock != null) {
updateBlockInfo(ozoneManager, finalBlock);
updateBlockInfo(ozoneManager, penultimateBlock);

RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder();
rb.setKeyInfo(returnKeyInfo ? keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true) :
openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true));
rb.setIsKeyInfo(returnKeyInfo);

return rb.build();
}

private void updateBlockInfo(OzoneManager ozoneManager, OmKeyLocationInfo blockInfo) throws IOException {
if (blockInfo != null) {
// set token to last block if enabled
if (ozoneManager.isGrpcBlockTokenEnabled()) {
String remoteUser = getRemoteUser().getShortUserName();
OzoneBlockTokenSecretManager secretManager = ozoneManager.getBlockTokenSecretManager();
finalBlock.setToken(secretManager.generateToken(remoteUser, finalBlock.getBlockID(),
EnumSet.of(READ, WRITE), finalBlock.getLength()));
blockInfo.setToken(secretManager.generateToken(remoteUser, blockInfo.getBlockID(),
EnumSet.of(READ, WRITE), blockInfo.getLength()));
}
// refresh last block pipeline
ContainerWithPipeline containerWithPipeline =
ozoneManager.getScmClient().getContainerClient().getContainerWithPipeline(finalBlock.getContainerID());
finalBlock.setPipeline(containerWithPipeline.getPipeline());
ozoneManager.getScmClient().getContainerClient().getContainerWithPipeline(blockInfo.getContainerID());
blockInfo.setPipeline(containerWithPipeline.getPipeline());
}

RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder();
rb.setKeyInfo(returnKeyInfo ? keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true) :
openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true));

return rb.build();
}

private OmKeyInfo getKey(String dbOzoneKey) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
Expand Down Expand Up @@ -691,7 +692,7 @@ private SnapshotDiffReportOzone getSnapshotDiffReportOnceComplete(
}

@Override
public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force) throws IOException {
public LeaseKeyInfo recoverFilePrepare(final String pathStr, boolean force) throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);

return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
Expand Down Expand Up @@ -1364,7 +1365,7 @@ public boolean isFileClosed(String pathStr) throws IOException {
}

@Override
public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force) throws IOException {
public LeaseKeyInfo recoverFilePrepare(final String pathStr, boolean force) throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
OFSPath ofsPath = new OFSPath(pathStr, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -98,7 +98,7 @@ SnapshotDiffReport getSnapshotDiffReport(Path snapshotDir,
String fromSnapshot, String toSnapshot)
throws IOException, InterruptedException;

OmKeyInfo recoverFilePrepare(String pathStr, boolean force) throws IOException;
LeaseKeyInfo recoverFilePrepare(String pathStr, boolean force) throws IOException;

void recoverFile(OmKeyArgs keyArgs) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.LeaseKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;

/**
Expand Down Expand Up @@ -142,9 +145,9 @@ public boolean recoverLease(Path f) throws IOException {

Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
OmKeyInfo keyInfo = null;
LeaseKeyInfo leaseKeyInfo;
try {
keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
leaseKeyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
// key is already closed, let's just return success
Expand All @@ -154,25 +157,41 @@ public boolean recoverLease(Path f) throws IOException {
}

// finalize the final block and get block length
List<OmKeyLocationInfo> locationInfoList = keyInfo.getLatestVersionLocations().getLocationList();
List<OmKeyLocationInfo> locationInfoList = leaseKeyInfo.getKeyInfo().getLatestVersionLocations().getLocationList();
if (!locationInfoList.isEmpty()) {
OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 1);
try {
block.setLength(getAdapter().finalizeBlock(block));
} catch (Throwable e) {
if (!forceRecovery) {
if (e instanceof StorageContainerException && (((StorageContainerException) e).getResult().equals(NO_SUCH_BLOCK)
|| ((StorageContainerException) e).getResult().equals(CONTAINER_NOT_FOUND))
&& !leaseKeyInfo.getIsKeyInfo() && locationInfoList.size() > 1) {
locationInfoList = leaseKeyInfo.getKeyInfo().getLatestVersionLocations().getLocationList().subList(0,
locationInfoList.size() - 1);
block = locationInfoList.get(locationInfoList.size() - 1);
try {
block.setLength(getAdapter().finalizeBlock(block));
} catch (Throwable exp) {
if (!forceRecovery) {
throw exp;
}
LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.",
FORCE_LEASE_RECOVERY_ENV, exp);
}
} else if (!forceRecovery) {
throw e;
} else {
LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.",
FORCE_LEASE_RECOVERY_ENV, e);
}
LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.",
FORCE_LEASE_RECOVERY_ENV, e);
}
}

// recover and commit file
long keyLength = locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(leaseKeyInfo.getKeyInfo().getVolumeName())
.setBucketName(leaseKeyInfo.getKeyInfo().getBucketName()).setKeyName(leaseKeyInfo.getKeyInfo().getKeyName())
.setReplicationConfig(leaseKeyInfo.getKeyInfo().getReplicationConfig()).setDataSize(keyLength)
.setLocationInfoList(locationInfoList)
.build();
getAdapter().recoverFile(keyArgs);
Expand Down
Loading