diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index a7e5bfb92d72..9c41b64006c8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -441,6 +441,36 @@ private SortedMap getBlockDataMap(long containerID, resultMap.put(blockID.getLocalID(), blkDataArr); } } + // When a stripe is written, the put block is sent to all nodes even if + // that nodes has zero bytes written to it. If the + // client does not get an ACK from all nodes, it will abandon the stripe, + // which can leave incomplete stripes on the DNs. Therefore, we should check + // that all blocks in the result map have an entry for all nodes. If they + // do not, it means this is an abandoned stripe and we should not attempt + // to reconstruct it. + // Note that if some nodes report different values for the block length, + // it also indicate garbage data at the end of the block. A different part + // of the code handles this and only reconstructs the valid part of the + // block, ie the minimum length reported by the nodes. + Iterator> resultIterator + = resultMap.entrySet().iterator(); + while (resultIterator.hasNext()) { + Map.Entry entry = resultIterator.next(); + BlockData[] blockDataArr = entry.getValue(); + for (Map.Entry e : sourceNodeMap.entrySet()) { + // There should be an entry in the Array for each keyset node. If there + // is not, this is an orphaned stripe and we should remove it from the + // result. + if (blockDataArr[e.getKey() - 1] == null) { + LOG.warn("In container {} block {} does not have a putBlock entry " + + "for index {} on datanode {} making it an orphan block / " + + "stripe. It will not be reconstructed", containerID, + entry.getKey(), e.getKey(), e.getValue()); + resultIterator.remove(); + break; + } + } + } return resultMap; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 9f96448f3b6c..0f3cdf8d35de 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientGrpc; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -49,6 +51,7 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; @@ -64,12 +67,15 @@ import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.ozone.test.GenericTestUtils; import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -83,6 +89,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -148,6 +155,10 @@ public class TestContainerCommandsEC { private static OzoneConfiguration config; private static CertificateClient certClient; + private static OzoneBucket classBucket; + private static OzoneVolume classVolume; + private static ReplicationConfig repConfig; + @BeforeAll public static void init() throws Exception { config = new OzoneConfiguration(); @@ -157,6 +168,10 @@ public static void init() throws Exception { OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE); config.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true); config.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true); + DatanodeConfiguration dnConf = config.getObject( + DatanodeConfiguration.class); + dnConf.setBlockDeletionInterval(Duration.ofSeconds(1)); + config.setFromObject(dnConf); startCluster(config); prepareData(KEY_SIZE_RANGES); } @@ -211,6 +226,170 @@ private Function chunksInReplicaFunc(int i) { } } + private void closeAllPipelines(ReplicationConfig replicationConfig) { + scm.getPipelineManager().getPipelines(replicationConfig, + Pipeline.PipelineState.OPEN) + .forEach(p -> { + try { + scm.getPipelineManager().closePipeline(p, false); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testOrphanBlock() throws Exception { + // Close all pipelines so we are guaranteed to get a new one + closeAllPipelines(repConfig); + // First write a full stripe, which is chunksize * dataNum + int keyLen = EC_CHUNK_SIZE * EC_DATA; + String keyName = UUID.randomUUID().toString(); + try (OutputStream out = classBucket + .createKey(keyName, keyLen, repConfig, new HashMap<>())) { + out.write(RandomUtils.nextBytes(keyLen)); + } + long orphanContainerID = classBucket.getKey(keyName) + .getOzoneKeyLocations().get(0).getContainerID(); + + PipelineID orphanPipelineID = scm.getContainerManager() + .getContainer(ContainerID.valueOf(orphanContainerID)).getPipelineID(); + + Pipeline orphanPipeline = scm.getPipelineManager() + .getPipeline(orphanPipelineID); + + Token orphanContainerToken = + containerTokenGenerator.generateToken( + ANY_USER, new ContainerID(orphanContainerID)); + + // Close the container by closing the pipeline + scm.getPipelineManager().closePipeline(orphanPipeline, false); + + // Find the datanode hosting Replica index = 2 + DatanodeDetails dn2 = null; + HddsDatanodeService dn2Service = null; + List pipelineNodes = orphanPipeline.getNodes(); + for (DatanodeDetails node : pipelineNodes) { + if (orphanPipeline.getReplicaIndex(node) == 2) { + dn2 = node; + break; + } + } + // Find the Cluster node corresponding to the datanode hosting index = 2 + for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { + if (dn.getDatanodeDetails().equals(dn2)) { + dn2Service = dn; + break; + } + } + + if (dn2 == null || dn2Service == null) { + throw new RuntimeException("Could not find datanode hosting index 2"); + } + + // Wait for all replicas in the pipeline to report as closed. + GenericTestUtils.waitFor(() -> { + try { + return scm.getContainerManager().getContainerReplicas( + ContainerID.valueOf(orphanContainerID)).stream() + .allMatch(cr -> cr.getState() == + StorageContainerDatanodeProtocolProtos. + ContainerReplicaProto.State.CLOSED); + } catch (ContainerNotFoundException e) { + throw new RuntimeException(e); + } + }, 500, 10000); + + // Get the block ID of the key we have just written. This will be used to + // delete the block from one of the datanode to make the stripe look like + // a orphan block. + long localID = classBucket.getKey(keyName) + .getOzoneKeyLocations().get(0).getLocalID(); + + // Create a delete command for the block and sent it. + DeleteBlocksCommand deleteBlocksCommand = + new DeleteBlocksCommand(ImmutableList.of( + StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction + .newBuilder() + .setContainerID(orphanContainerID) + .addLocalID(localID) + .setTxID(1L) + .setCount(10) + .build())); + dn2Service.getDatanodeStateMachine().getContext() + .addCommand(deleteBlocksCommand); + + try (XceiverClientGrpc client = new XceiverClientGrpc( + createSingleNodePipeline(orphanPipeline, dn2, 1), cluster.getConf())) { + // Wait for the block to be actually deleted + GenericTestUtils.waitFor(() -> { + try { + ListBlockResponseProto response = ContainerProtocolCalls + .listBlock(client, orphanContainerID, null, Integer.MAX_VALUE, + orphanContainerToken); + for (BlockData bd : response.getBlockDataList()) { + if (bd.getBlockID().getLocalID() == localID) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 500, 30000); + } + + ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator( + config, certClient, null, ECReconstructionMetrics.create()); + + // Create a reconstruction command to create a new copy of indexes 4 and 5 + // which means 1 to 3 must be available. However we know the block + // information is missing for index 2. As all containers in the stripe must + // have the block information, this makes the stripe look like a orphan + // block, where the write went to some nodes but not all. + SortedMap sourceNodeMap = new TreeMap<>(); + for (DatanodeDetails node : orphanPipeline.getNodes()) { + if (orphanPipeline.getReplicaIndex(node) <= EC_DATA) { + sourceNodeMap.put(orphanPipeline.getReplicaIndex(node), node); + } + } + // Here we find some spare nodes - ie nodes in the cluster that are not in + // the original pipeline. + List targets = cluster.getHddsDatanodes().stream() + .map(HddsDatanodeService::getDatanodeDetails) + .filter(d -> !orphanPipeline.getNodes().contains(d)) + .limit(2) + .collect(Collectors.toList()); + SortedMap targetNodeMap = new TreeMap<>(); + for (int j = 0; j < targets.size(); j++) { + targetNodeMap.put(EC_DATA + j + 1, targets.get(j)); + } + + // Attempt to reconstruct the container. + coordinator.reconstructECContainerGroup(orphanContainerID, + (ECReplicationConfig) repConfig, + sourceNodeMap, targetNodeMap); + + // Check the block listing for the recovered containers 4 or 5 and they + // should be present but with no blocks as the only block in the container + // was an orphan block. + try (XceiverClientGrpc reconClient = new XceiverClientGrpc( + createSingleNodePipeline(orphanPipeline, targetNodeMap.get(4), 4), + cluster.getConf())) { + ListBlockResponseProto response = ContainerProtocolCalls + .listBlock(reconClient, orphanContainerID, null, Integer.MAX_VALUE, + orphanContainerToken); + long count = response.getBlockDataList().stream() + .filter(bd -> bd.getBlockID().getLocalID() == localID) + .count(); + + Assert.assertEquals(0L, count); + Assert.assertEquals(0, response.getBlockDataList().size()); + } + } + @Test public void testListBlock() throws Exception { for (int i = 0; i < datanodeDetails.size(); i++) { @@ -677,17 +856,17 @@ public static void prepareData(int[][] ranges) throws Exception { final String volumeName = UUID.randomUUID().toString(); final String bucketName = UUID.randomUUID().toString(); store.createVolume(volumeName); - OzoneVolume volume = store.getVolume(volumeName); - volume.createBucket(bucketName); - OzoneBucket bucket = volume.getBucket(bucketName); - final ReplicationConfig repConfig = + classVolume = store.getVolume(volumeName); + classVolume.createBucket(bucketName); + classBucket = classVolume.getBucket(bucketName); + repConfig = new ECReplicationConfig(EC_DATA, EC_PARITY, EC_CODEC, EC_CHUNK_SIZE); values = new byte[ranges.length][]; for (int i = 0; i < ranges.length; i++) { int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]); values[i] = RandomUtils.nextBytes(keySize); final String keyName = UUID.randomUUID().toString(); - try (OutputStream out = bucket + try (OutputStream out = classBucket .createKey(keyName, values[i].length, repConfig, new HashMap<>())) { out.write(values[i]); }