Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ public static boolean requireBlockToken(
case PutSmallFile:
case ReadChunk:
case WriteChunk:
case FinalizeBlock:
return true;
default:
return false;
Expand Down Expand Up @@ -566,6 +567,11 @@ public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) {
blockID = msg.getWriteChunk().getBlockID();
}
break;
case FinalizeBlock:
if (msg.hasFinalizeBlock()) {
blockID = msg.getFinalizeBlock().getBlockID();
}
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
Expand Down Expand Up @@ -263,6 +264,36 @@ public static XceiverClientReply putBlockAsync(XceiverClientSpi xceiverClient,
return xceiverClient.sendCommandAsync(request);
}

/**
* Calls the container protocol to finalize a container block.
*
* @param xceiverClient client to perform call
* @param blockID block ID to identify block
* @param token a token for this block (may be null)
* @return FinalizeBlockResponseProto
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.FinalizeBlockResponseProto finalizeBlock(
XceiverClientSpi xceiverClient, DatanodeBlockID blockID,
Token<OzoneBlockTokenIdentifier> token)
throws IOException {
FinalizeBlockRequestProto.Builder finalizeBlockRequest =
FinalizeBlockRequestProto.newBuilder().setBlockID(blockID);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.FinalizeBlock)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id)
.setFinalizeBlock(finalizeBlockRequest);
if (token != null) {
builder.setEncodedToken(token.encodeToUrlString());
}
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
return response.getFinalizeBlock();
}

public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, BlockData containerBlockData, boolean eof,
String tokenString) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ private OzoneConsts() {
/** Metadata stored in OmKeyInfo. */
public static final String HSYNC_CLIENT_ID = "hsyncClientId";
public static final String LEASE_RECOVERY = "leaseRecovery";
public static final String FORCE_LEASE_RECOVERY_ENV = "OZONE.CLIENT.RECOVER.LEASE.FORCE";

//GDPR
public static final String GDPR_FLAG = "gdprEnabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -423,68 +422,60 @@ public TransactionContext startTransaction(RaftClientRequest request)
}
return builder.build().setException(ioe);
}

boolean blockAlreadyFinalized = false;
if (proto.getCmdType() == Type.PutBlock) {
TransactionContext ctxt = rejectRequest(request,
proto.getContainerID(), proto.getPutBlock().getBlockData()
.getBlockID().getLocalID());
if (ctxt != null) {
return ctxt;
}
blockAlreadyFinalized = shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID());
} else if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
TransactionContext ctxt = rejectRequest(request,
proto.getContainerID(), write.getBlockID().getLocalID());
if (ctxt != null) {
return ctxt;
blockAlreadyFinalized = shouldRejectRequest(write.getBlockID());
if (!blockAlreadyFinalized) {
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
.setBlockID(write.getBlockID())
.setChunkData(write.getChunkData())
// skipping the data field as it is
// already set in statemachine data proto
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
.newBuilder(proto)
.setPipelineID(gid.getUuid().toString())
.setWriteChunk(commitWriteChunkProto)
.setTraceID(proto.getTraceID())
.build();
Preconditions.checkArgument(write.hasData());
Preconditions.checkArgument(!write.getData().isEmpty());

final Context context = new Context(proto, commitContainerCommandProto);
return builder
.setStateMachineContext(context)
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
}
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
.setBlockID(write.getBlockID())
.setChunkData(write.getChunkData())
// skipping the data field as it is
// already set in statemachine data proto
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
.newBuilder(proto)
.setPipelineID(gid.getUuid().toString())
.setWriteChunk(commitWriteChunkProto)
.setTraceID(proto.getTraceID())
.build();
Preconditions.checkArgument(write.hasData());
Preconditions.checkArgument(!write.getData().isEmpty());

final Context context = new Context(proto, commitContainerCommandProto);
return builder
.setStateMachineContext(context)
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
} else if (proto.getCmdType() == Type.FinalizeBlock) {
containerController.addFinalizedBlock(proto.getContainerID(),
proto.getFinalizeBlock().getBlockID().getLocalID());
}
final Context context = new Context(proto, proto);
return builder
.setStateMachineContext(context)
.setLogData(proto.toByteString())
.build();
}

@Nullable
private TransactionContext rejectRequest(RaftClientRequest request,
long containerId, long localId) {
if (containerController.isFinalizedBlockExist(containerId, localId)) {
TransactionContext ctxt = TransactionContext.newBuilder()
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
if (blockAlreadyFinalized) {
TransactionContext transactionContext = builder.build();
transactionContext.setException(new StorageContainerException("Block already finalized",
ContainerProtos.Result.BLOCK_ALREADY_FINALIZED));
return transactionContext;
} else {
final Context context = new Context(proto, proto);
return builder
.setStateMachineContext(context)
.setLogData(proto.toByteString())
.build();
ctxt.setException(new IOException("Block already finalized"));
return ctxt;
}
return null;
}

private boolean shouldRejectRequest(ContainerProtos.DatanodeBlockID blockID) {
return containerController.isFinalizedBlockExist(blockID.getContainerID(), blockID.getLocalID());
}

private static ContainerCommandRequestProto getContainerCommandRequestProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
Expand Down Expand Up @@ -139,6 +140,7 @@ public class KeyValueHandler extends Handler {
private final boolean validateChunkChecksumData;
// A striped lock that is held during container creation.
private final Striped<Lock> containerCreationLocks;
private static FaultInjector injector;

public KeyValueHandler(ConfigurationSource config,
String datanodeId,
Expand Down Expand Up @@ -567,6 +569,10 @@ ContainerCommandResponseProto handlePutBlock(

ContainerCommandResponseProto handleFinalizeBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
ContainerCommandResponseProto responseProto = checkFaultInjector(request);
if (responseProto != null) {
return responseProto;
}

if (!request.hasFinalizeBlock()) {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -646,6 +652,12 @@ ContainerCommandResponseProto handleGetBlock(
*/
ContainerCommandResponseProto handleGetCommittedBlockLength(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

ContainerCommandResponseProto responseProto = checkFaultInjector(request);
if (responseProto != null) {
return responseProto;
}

if (!request.hasGetCommittedBlockLength()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Get Key request. trace ID: {}",
Expand Down Expand Up @@ -1444,8 +1456,34 @@ private void triggerVolumeScanAndThrowException(Container container,
throw new StorageContainerException(msg, result);
}

private ContainerCommandResponseProto checkFaultInjector(ContainerCommandRequestProto request) {
if (injector != null) {
Throwable ex = injector.getException();
if (ex != null) {
// reset injector
injector = null;
return ContainerUtils.logAndReturnError(LOG, (StorageContainerException) ex, request);
}
try {
injector.pause();
} catch (IOException e) {
// do nothing
}
}
return null;
}

public static Logger getLogger() {
return LOG;
}

@VisibleForTesting
public static FaultInjector getInjector() {
return injector;
}

@VisibleForTesting
public static void setInjector(FaultInjector instance) {
injector = instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,13 @@ public void resume() throws IOException {
@VisibleForTesting
public void reset() throws IOException {
}

@VisibleForTesting
public void setException(Throwable e) {
}

@VisibleForTesting
public Throwable getException() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ enum Result {
DELETE_ON_NON_EMPTY_CONTAINER = 44;
EXPORT_CONTAINER_METADATA_FAILED = 45;
IMPORT_CONTAINER_METADATA_FAILED = 46;
BLOCK_ALREADY_FINALIZED = 47;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,10 +1100,11 @@ EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
* @param volumeName - The volume name.
* @param bucketName - The bucket name.
* @param keyName - The key user want to recover.
* @return OmKeyInfo KeyInfo is file under recovery
* @param force - force recover the file.
* @return OmKeyInfo KeyInfo of file under recovery
* @throws IOException if an error occurs
*/
List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String keyName) throws IOException;
OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) throws IOException;

/**
* Update modification time and access time of a file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2447,23 +2447,23 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp,
}

@Override
public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String keyName) throws IOException {
public OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force)
throws IOException {
RecoverLeaseRequest recoverLeaseRequest =
RecoverLeaseRequest.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
.setForce(force)
.build();

OMRequest omRequest = createOMRequest(Type.RecoverLease)
.setRecoverLeaseRequest(recoverLeaseRequest).build();

RecoverLeaseResponse recoverLeaseResponse =
handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
ArrayList<OmKeyInfo> list = new ArrayList();
list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()));
list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo()));
return list;

return OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo());
}

@Override
Expand Down
Loading