Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,36 @@ private SortedMap<Long, BlockData[]> getBlockDataMap(long containerID,
resultMap.put(blockID.getLocalID(), blkDataArr);
}
}
// When a stripe is written, the put block is sent to all nodes even if
// that nodes has zero bytes written to it. If the
// client does not get an ACK from all nodes, it will abandon the stripe,
// which can leave incomplete stripes on the DNs. Therefore, we should check
// that all blocks in the result map have an entry for all nodes. If they
// do not, it means this is an abandoned stripe and we should not attempt
// to reconstruct it.
// Note that if some nodes report different values for the block length,
// it also indicate garbage data at the end of the block. A different part
// of the code handles this and only reconstructs the valid part of the
// block, ie the minimum length reported by the nodes.
Iterator<Map.Entry<Long, BlockData[]>> resultIterator
= resultMap.entrySet().iterator();
while (resultIterator.hasNext()) {
Map.Entry<Long, BlockData[]> entry = resultIterator.next();
BlockData[] blockDataArr = entry.getValue();
for (Map.Entry<Integer, DatanodeDetails> e : sourceNodeMap.entrySet()) {
// There should be an entry in the Array for each keyset node. If there
// is not, this is an orphaned stripe and we should remove it from the
// result.
if (blockDataArr[e.getKey() - 1] == null) {
LOG.warn("In container {} block {} does not have a putBlock entry " +
"for index {} on datanode {} making it an orphan block / " +
"stripe. It will not be reconstructed", containerID,
entry.getKey(), e.getKey(), e.getValue());
resultIterator.remove();
break;
}
}
}
return resultMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
Expand All @@ -49,6 +51,7 @@
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
Expand All @@ -64,12 +67,15 @@
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -83,6 +89,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
Expand Down Expand Up @@ -148,6 +155,10 @@ public class TestContainerCommandsEC {
private static OzoneConfiguration config;
private static CertificateClient certClient;

private static OzoneBucket classBucket;
private static OzoneVolume classVolume;
private static ReplicationConfig repConfig;

@BeforeAll
public static void init() throws Exception {
config = new OzoneConfiguration();
Expand All @@ -157,6 +168,10 @@ public static void init() throws Exception {
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
config.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
config.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true);
DatanodeConfiguration dnConf = config.getObject(
DatanodeConfiguration.class);
dnConf.setBlockDeletionInterval(Duration.ofSeconds(1));
config.setFromObject(dnConf);
startCluster(config);
prepareData(KEY_SIZE_RANGES);
}
Expand Down Expand Up @@ -211,6 +226,170 @@ private Function<Integer, Integer> chunksInReplicaFunc(int i) {
}
}

private void closeAllPipelines(ReplicationConfig replicationConfig) {
scm.getPipelineManager().getPipelines(replicationConfig,
Pipeline.PipelineState.OPEN)
.forEach(p -> {
try {
scm.getPipelineManager().closePipeline(p, false);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
});
}

@Test
public void testOrphanBlock() throws Exception {
// Close all pipelines so we are guaranteed to get a new one
closeAllPipelines(repConfig);
// First write a full stripe, which is chunksize * dataNum
int keyLen = EC_CHUNK_SIZE * EC_DATA;
String keyName = UUID.randomUUID().toString();
try (OutputStream out = classBucket
.createKey(keyName, keyLen, repConfig, new HashMap<>())) {
out.write(RandomUtils.nextBytes(keyLen));
}
long orphanContainerID = classBucket.getKey(keyName)
.getOzoneKeyLocations().get(0).getContainerID();

PipelineID orphanPipelineID = scm.getContainerManager()
.getContainer(ContainerID.valueOf(orphanContainerID)).getPipelineID();

Pipeline orphanPipeline = scm.getPipelineManager()
.getPipeline(orphanPipelineID);

Token<ContainerTokenIdentifier> orphanContainerToken =
containerTokenGenerator.generateToken(
ANY_USER, new ContainerID(orphanContainerID));

// Close the container by closing the pipeline
scm.getPipelineManager().closePipeline(orphanPipeline, false);

// Find the datanode hosting Replica index = 2
DatanodeDetails dn2 = null;
HddsDatanodeService dn2Service = null;
List<DatanodeDetails> pipelineNodes = orphanPipeline.getNodes();
for (DatanodeDetails node : pipelineNodes) {
if (orphanPipeline.getReplicaIndex(node) == 2) {
dn2 = node;
break;
}
}
// Find the Cluster node corresponding to the datanode hosting index = 2
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (dn.getDatanodeDetails().equals(dn2)) {
dn2Service = dn;
break;
}
}

if (dn2 == null || dn2Service == null) {
throw new RuntimeException("Could not find datanode hosting index 2");
}

// Wait for all replicas in the pipeline to report as closed.
GenericTestUtils.waitFor(() -> {
try {
return scm.getContainerManager().getContainerReplicas(
ContainerID.valueOf(orphanContainerID)).stream()
.allMatch(cr -> cr.getState() ==
StorageContainerDatanodeProtocolProtos.
ContainerReplicaProto.State.CLOSED);
} catch (ContainerNotFoundException e) {
throw new RuntimeException(e);
}
}, 500, 10000);

// Get the block ID of the key we have just written. This will be used to
// delete the block from one of the datanode to make the stripe look like
// a orphan block.
long localID = classBucket.getKey(keyName)
.getOzoneKeyLocations().get(0).getLocalID();

// Create a delete command for the block and sent it.
DeleteBlocksCommand deleteBlocksCommand =
new DeleteBlocksCommand(ImmutableList.of(
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
.newBuilder()
.setContainerID(orphanContainerID)
.addLocalID(localID)
.setTxID(1L)
.setCount(10)
.build()));
dn2Service.getDatanodeStateMachine().getContext()
.addCommand(deleteBlocksCommand);

try (XceiverClientGrpc client = new XceiverClientGrpc(
createSingleNodePipeline(orphanPipeline, dn2, 1), cluster.getConf())) {
// Wait for the block to be actually deleted
GenericTestUtils.waitFor(() -> {
try {
ListBlockResponseProto response = ContainerProtocolCalls
.listBlock(client, orphanContainerID, null, Integer.MAX_VALUE,
orphanContainerToken);
for (BlockData bd : response.getBlockDataList()) {
if (bd.getBlockID().getLocalID() == localID) {
return false;
}
}
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 500, 30000);
}

ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(
config, certClient, null, ECReconstructionMetrics.create());

// Create a reconstruction command to create a new copy of indexes 4 and 5
// which means 1 to 3 must be available. However we know the block
// information is missing for index 2. As all containers in the stripe must
// have the block information, this makes the stripe look like a orphan
// block, where the write went to some nodes but not all.
SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
for (DatanodeDetails node : orphanPipeline.getNodes()) {
if (orphanPipeline.getReplicaIndex(node) <= EC_DATA) {
sourceNodeMap.put(orphanPipeline.getReplicaIndex(node), node);
}
}
// Here we find some spare nodes - ie nodes in the cluster that are not in
// the original pipeline.
List<DatanodeDetails> targets = cluster.getHddsDatanodes().stream()
.map(HddsDatanodeService::getDatanodeDetails)
.filter(d -> !orphanPipeline.getNodes().contains(d))
.limit(2)
.collect(Collectors.toList());
SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
for (int j = 0; j < targets.size(); j++) {
targetNodeMap.put(EC_DATA + j + 1, targets.get(j));
}

// Attempt to reconstruct the container.
coordinator.reconstructECContainerGroup(orphanContainerID,
(ECReplicationConfig) repConfig,
sourceNodeMap, targetNodeMap);

// Check the block listing for the recovered containers 4 or 5 and they
// should be present but with no blocks as the only block in the container
// was an orphan block.
try (XceiverClientGrpc reconClient = new XceiverClientGrpc(
createSingleNodePipeline(orphanPipeline, targetNodeMap.get(4), 4),
cluster.getConf())) {
ListBlockResponseProto response = ContainerProtocolCalls
.listBlock(reconClient, orphanContainerID, null, Integer.MAX_VALUE,
orphanContainerToken);
long count = response.getBlockDataList().stream()
.filter(bd -> bd.getBlockID().getLocalID() == localID)
.count();

Assert.assertEquals(0L, count);
Assert.assertEquals(0, response.getBlockDataList().size());
}
}

@Test
public void testListBlock() throws Exception {
for (int i = 0; i < datanodeDetails.size(); i++) {
Expand Down Expand Up @@ -677,17 +856,17 @@ public static void prepareData(int[][] ranges) throws Exception {
final String volumeName = UUID.randomUUID().toString();
final String bucketName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
final ReplicationConfig repConfig =
classVolume = store.getVolume(volumeName);
classVolume.createBucket(bucketName);
classBucket = classVolume.getBucket(bucketName);
repConfig =
new ECReplicationConfig(EC_DATA, EC_PARITY, EC_CODEC, EC_CHUNK_SIZE);
values = new byte[ranges.length][];
for (int i = 0; i < ranges.length; i++) {
int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]);
values[i] = RandomUtils.nextBytes(keySize);
final String keyName = UUID.randomUUID().toString();
try (OutputStream out = bucket
try (OutputStream out = classBucket
.createKey(keyName, values[i].length, repConfig, new HashMap<>())) {
out.write(values[i]);
}
Expand Down