Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,31 @@ public void closeContainer(long containerID, DatanodeDetails dn,
}
}

public void deleteRecoveringContainer(long containerID, DatanodeDetails dn,
ECReplicationConfig repConfig, String encodedToken) throws IOException {
XceiverClientSpi xceiverClient = this.xceiverClientManager
.acquireClient(singleNodePipeline(dn, repConfig));
try {
// Before deleting the recovering container, just make sure that state is
// Recovering. There will be still race condition, but that will avoid
// most usual case.
ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
ContainerProtocolCalls
.readContainer(xceiverClient, containerID, encodedToken);
if (readContainerResponseProto
.getContainerData()
.getState() == ContainerProtos.ContainerDataProto.State.RECOVERING) {
ContainerProtocolCalls
.deleteContainer(xceiverClient, containerID, true, encodedToken);
} else {
LOG.warn("Container will not be deleted as it is not a recovering"
+ " container {}", containerID);
}
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
}
}

public void createRecoveringContainer(long containerID, DatanodeDetails dn,
ECReplicationConfig repConfig, String encodedToken, int replicaIndex)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,25 +135,52 @@ public void reconstructECContainerGroup(long containerID,

// 1. create target recovering containers.
String containerToken = encode(tokenHelper.getContainerToken(cid));
for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
.entrySet()) {
DatanodeDetails dn = indexDnPair.getValue();
Integer index = indexDnPair.getKey();
containerOperationClient.createRecoveringContainer(containerID, dn,
repConfig, containerToken, index);
}
List<DatanodeDetails> recoveringContainersCreatedDNs = new ArrayList<>();
try {
for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
.entrySet()) {
DatanodeDetails dn = indexDnPair.getValue();
Integer index = indexDnPair.getKey();
containerOperationClient
.createRecoveringContainer(containerID, dn, repConfig,
containerToken, index);
recoveringContainersCreatedDNs.add(dn);
}

// 2. Reconstruct and transfer to targets
for (BlockLocationInfo blockLocationInfo : blockLocationInfoMap.values()) {
reconstructECBlockGroup(blockLocationInfo, repConfig, targetNodeMap);
}
// 2. Reconstruct and transfer to targets
for (BlockLocationInfo blockLocationInfo : blockLocationInfoMap
.values()) {
reconstructECBlockGroup(blockLocationInfo, repConfig, targetNodeMap);
}

// 3. Close containers
for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
.entrySet()) {
DatanodeDetails dn = indexDnPair.getValue();
containerOperationClient.closeContainer(containerID, dn, repConfig,
containerToken);
// 3. Close containers
for (DatanodeDetails dn: recoveringContainersCreatedDNs) {
containerOperationClient
.closeContainer(containerID, dn, repConfig, containerToken);
}
} catch (Exception e) {
// Any exception let's delete the recovering containers.
LOG.warn(
"Exception while reconstructing the container {}. Cleaning up"
+ " all the recovering containers in the reconstruction process.",
containerID, e);
// Delete only the current thread successfully created recovering
// containers.
for (DatanodeDetails dn : recoveringContainersCreatedDNs) {
try {
containerOperationClient
.deleteRecoveringContainer(containerID, dn, repConfig,
containerToken);
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted the container {}, at the target: {}",
containerID, dn);
}
} catch (IOException ioe) {
LOG.error("Exception while deleting the container {} at target: {}",
containerID, dn, ioe);
}
throw e;
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
Expand Down Expand Up @@ -268,7 +269,12 @@ ContainerCommandResponseProto handleCreateContainer(
}
// Create Container request should be passed a null container as the
// container would be created here.
Preconditions.checkArgument(kvContainer == null);
if (kvContainer != null) {
return ContainerUtils.logAndReturnError(LOG,
new StorageContainerException(
"Container creation failed because " + "key value container" +
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
}

long containerID = request.getContainerID();

Expand Down