Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
Expand Down Expand Up @@ -416,6 +417,16 @@ public static boolean isReadOnly(
}
}

/**
* Returns true if the container is in open to write state
* (OPEN or RECOVERING).
*
* @param state - container state
*/
public static boolean isOpenToWriteState(State state) {
return state == State.OPEN || state == State.RECOVERING;
}

/**
* Not all datanode container cmd protocol has embedded ozone block token.
* Block token are issued by Ozone Manager and return to Ozone client to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -416,6 +417,23 @@ public static PutSmallFileResponseProto writeSmallFile(
return response.getPutSmallFile();
}

/**
* createRecoveringContainer call that creates a container on the datanode.
* Currently this is used for EC reconstruction containers. When EC
* reconstruction coordinator reconstructing the containers, the in progress
* containers would be created as "RECOVERING" state containers.
* @param client - client
* @param containerID - ID of container
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
@InterfaceStability.Evolving
public static void createRecoveringContainer(XceiverClientSpi client,
long containerID, String encodedToken) throws IOException {
createContainerInternal(client, containerID, encodedToken,
ContainerProtos.ContainerDataProto.State.RECOVERING);
}

/**
* createContainer call that creates a container on the datanode.
* @param client - client
Expand All @@ -425,11 +443,26 @@ public static PutSmallFileResponseProto writeSmallFile(
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String encodedToken) throws IOException {
createContainerInternal(client, containerID, encodedToken, null);
}
/**
* createContainer call that creates a container on the datanode.
* @param client - client
* @param containerID - ID of container
* @param encodedToken - encodedToken if security is enabled
* @param state - state of the container
* @throws IOException
*/
private static void createContainerInternal(XceiverClientSpi client,
long containerID, String encodedToken,
ContainerProtos.ContainerDataProto.State state) throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
ContainerProtos.CreateContainerRequestProto.newBuilder();
createRequest.setContainerType(ContainerProtos.ContainerType
.KeyValueContainer);
if (state != null) {
createRequest.setState(state);
}

String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,24 @@ public static ContainerCommandRequestProto getPutBlockRequest(
.build();
}

public static Builder newWriteChunkRequestBuilder(Pipeline pipeline,
BlockID blockID, int datalen, int seq) throws IOException {
ChunkBuffer data = getData(datalen);
return newWriteChunkRequestBuilder(pipeline, blockID, data, seq);
}

public static Builder newWriteChunkRequestBuilder(
Pipeline pipeline, BlockID blockID, int datalen, int seq)
Pipeline pipeline, BlockID blockID, ChunkBuffer data, int seq)
throws IOException {
LOG.trace("writeChunk {} (blockID={}) to pipeline={}",
datalen, blockID, pipeline);
data.limit(), blockID, pipeline);
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
ContainerProtos.WriteChunkRequestProto
.newBuilder();

writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());

ChunkBuffer data = getData(datalen);
ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, datalen);
ChunkInfo info = getChunk(blockID.getLocalID(), seq, 0, data.limit());
setDataChecksum(info, data);

writeRequest.setChunkData(info.getProtoBufMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public ContainerReportsProto getContainerReport() throws IOException {
// consumers such as SCM.
synchronized (this) {
for (Container<?> container : containers) {
if (container.getContainerState()
== ContainerProtos.ContainerDataProto.State.RECOVERING) {
// Skip the recovering containers in ICR and FCR for now.
continue;
}
crBuilder.addReports(container.getContainerReport());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ private ContainerCommandResponseProto dispatchRequest(
// only be in open or closing state.
State containerState = container.getContainerData().getState();
Preconditions.checkState(
containerState == State.OPEN || containerState == State.CLOSING);
containerState == State.OPEN
|| containerState == State.CLOSING
|| containerState == State.RECOVERING);
// mark and persist the container state to be unhealthy
try {
handler.markContainerUnhealthy(container);
Expand Down Expand Up @@ -471,7 +473,8 @@ public void validateContainerCommand(
}

State containerState = container.getContainerState();
if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) {
if (!HddsUtils.isReadOnly(msg)
&& !HddsUtils.isOpenToWriteState(containerState)) {
switch (cmdType) {
case CreateContainer:
// Create Container is idempotent. There is nothing to validate.
Expand All @@ -481,8 +484,8 @@ public void validateContainerCommand(
// while execution. Nothing to validate here.
break;
default:
// if the container is not open, no updates can happen. Just throw
// an exception
// if the container is not open/recovering, no updates can happen. Just
// throw an exception
ContainerNotOpenException cex = new ContainerNotOpenException(
"Container " + containerID + " in " + containerState + " state");
audit(action, eventType, params, AuditEventStatus.FAILURE, cex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.OutputStream;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
Expand Down Expand Up @@ -97,6 +98,12 @@ protected String getDatanodeId() {
*/
protected void sendICR(final Container container)
throws StorageContainerException {
if (container
.getContainerState() == ContainerProtos.ContainerDataProto
.State.RECOVERING) {
// Ignoring the recovering containers reports for now.
return;
}
icrSender.send(container);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
Expand Down Expand Up @@ -304,7 +305,7 @@ public void delete() throws StorageContainerException {
public void markContainerForClose() throws StorageContainerException {
writeLock();
try {
if (getContainerState() != ContainerDataProto.State.OPEN) {
if (!HddsUtils.isOpenToWriteState(getContainerState())) {
throw new StorageContainerException(
"Attempting to close a " + getContainerState() + " container.",
CONTAINER_NOT_OPEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.Function;

import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
Expand Down Expand Up @@ -276,6 +277,10 @@ ContainerCommandResponseProto handleCreateContainer(
KeyValueContainerData newContainerData = new KeyValueContainerData(
containerID, layoutVersion, maxContainerSize, request.getPipelineID(),
getDatanodeId());
State state = request.getCreateContainer().getState();
if (state != null) {
newContainerData.setState(state);
}
newContainerData.setReplicaIndex(request.getCreateContainer()
.getReplicaIndex());
// TODO: Add support to add metadataList to ContainerData. Add metadata
Expand Down Expand Up @@ -911,7 +916,8 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
* in the leader goes to closing state, will arrive here even the container
* might already be in closing state here.
*/
if (containerState == State.OPEN || containerState == State.CLOSING) {
if (containerState == State.OPEN || containerState == State.CLOSING
|| containerState == State.RECOVERING) {
return;
}

Expand Down Expand Up @@ -972,8 +978,8 @@ public void markContainerForClose(Container container)
throws IOException {
container.writeLock();
try {
// Move the container to CLOSING state only if it's OPEN
if (container.getContainerState() == State.OPEN) {
// Move the container to CLOSING state only if it's OPEN/RECOVERING
if (HddsUtils.isOpenToWriteState(container.getContainerState())) {
container.markContainerForClose();
sendICR(container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ message ContainerDataProto {
UNHEALTHY = 5;
INVALID = 6;
DELETED = 7;
RECOVERING = 8;
}
required int64 containerID = 1;
repeated KeyValue metadata = 2;
Expand All @@ -272,6 +273,7 @@ message CreateContainerRequestProto {
repeated KeyValue metadata = 2;
optional ContainerType containerType = 3 [default = KeyValueContainer];
optional int32 replicaIndex = 4;
optional ContainerDataProto.State state = 5;
}

message CreateContainerResponseProto {
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ enum LifeCycleState {
CLOSED = 4;
DELETING = 5;
DELETED = 6; // object is deleted.
RECOVERING = 7;
}

enum LifeCycleEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void testSCMContainerMetrics() {
put(HddsProtos.LifeCycleState.CLOSED.toString(), 5);
put(HddsProtos.LifeCycleState.DELETING.toString(), 6);
put(HddsProtos.LifeCycleState.DELETED.toString(), 7);
put(HddsProtos.LifeCycleState.RECOVERING.toString(), 8);
}};


Expand Down Expand Up @@ -78,6 +79,6 @@ public void testSCMContainerMetrics() {
verify(mb, times(1)).addGauge(Interns.info("DeletedContainers",
"Number of containers in deleted state"), 7);
verify(mb, times(1)).addGauge(Interns.info("TotalContainers",
"Number of all containers"), 27);
"Number of all containers"), 35);
}
}
Loading