diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java index 99b5800c450c..261073123b7c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java @@ -318,21 +318,11 @@ private Lock getLock(long containerID) { * swapped into place. */ public Optional read(ContainerData data) throws IOException { - long containerID = data.getContainerID(); - File checksumFile = getContainerChecksumFile(data); try { - if (!checksumFile.exists()) { - LOG.debug("No checksum file currently exists for container {} at the path {}", containerID, checksumFile); - return Optional.empty(); - } - try (FileInputStream inStream = new FileInputStream(checksumFile)) { - return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), - () -> Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream))); - } + return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), () -> readChecksumInfo(data)); } catch (IOException ex) { metrics.incrementMerkleTreeReadFailures(); - throw new IOException("Error occurred when reading container merkle tree for containerID " - + data.getContainerID() + " at path " + checksumFile, ex); + throw new IOException(ex); } } @@ -383,6 +373,29 @@ public ByteString getContainerChecksumInfo(KeyValueContainerData data) throws IO } } + /** + * Reads the container checksum info file (containerID.tree) from the disk. + * Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically + * swapped into place. + */ + public static Optional readChecksumInfo(ContainerData data) + throws IOException { + long containerID = data.getContainerID(); + File checksumFile = getContainerChecksumFile(data); + try { + if (!checksumFile.exists()) { + LOG.debug("No checksum file currently exists for container {} at the path {}", containerID, checksumFile); + return Optional.empty(); + } + try (FileInputStream inStream = new FileInputStream(checksumFile)) { + return Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream)); + } + } catch (IOException ex) { + throw new IOException("Error occurred when reading container merkle tree for containerID " + + data.getContainerID() + " at path " + checksumFile, ex); + } + } + @VisibleForTesting public ContainerMerkleTreeMetrics getMetrics() { return this.metrics; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index f0d13c14d305..dbf5cfaa8e99 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -26,12 +26,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Optional; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerChecksumInfo; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; @@ -277,6 +280,23 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData, } } + private static void populateContainerDataChecksum(KeyValueContainerData kvContainerData) { + if (kvContainerData.isOpen()) { + return; + } + + try { + Optional optionalContainerChecksumInfo = ContainerChecksumTreeManager + .readChecksumInfo(kvContainerData); + if (optionalContainerChecksumInfo.isPresent()) { + ContainerChecksumInfo containerChecksumInfo = optionalContainerChecksumInfo.get(); + kvContainerData.setDataChecksum(containerChecksumInfo.getContainerMerkleTree().getDataChecksum()); + } + } catch (IOException ex) { + LOG.warn("Failed to read checksum info for container {}", kvContainerData.getContainerID(), ex); + } + } + private static void populateContainerMetadata( KeyValueContainerData kvContainerData, DatanodeStore store, boolean bCheckChunksFilePath) @@ -356,6 +376,7 @@ private static void populateContainerMetadata( // Load finalizeBlockLocalIds for container in memory. populateContainerFinalizeBlock(kvContainerData, store); + populateContainerDataChecksum(kvContainerData); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java index 987ff7cf81f2..538fd9c15c6f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java @@ -201,7 +201,6 @@ public void testDeletedBlocksPreservedOnTreeWrite() throws Exception { assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0); List expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L); checksumManager.markBlocksAsDeleted(container, new ArrayList<>(expectedBlocksToDelete)); - assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0); ContainerMerkleTreeWriter tree = buildTestTree(config); checksumManager.writeContainerDataTree(container, tree); assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 0); @@ -222,7 +221,6 @@ public void testTreePreservedOnDeletedBlocksWrite() throws Exception { assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0); ContainerMerkleTreeWriter tree = buildTestTree(config); checksumManager.writeContainerDataTree(container, tree); - assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0); List expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L); checksumManager.markBlocksAsDeleted(container, new ArrayList<>(expectedBlocksToDelete)); assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 0); @@ -242,8 +240,6 @@ public void testReadContainerMerkleTreeMetric() throws Exception { assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0); ContainerMerkleTreeWriter tree = buildTestTree(config); checksumManager.writeContainerDataTree(container, tree); - assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0); - checksumManager.writeContainerDataTree(container, tree); assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 0); assertTrue(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total() > 0); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java index 16afcc960850..0e2baeeeb341 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@ -188,6 +188,7 @@ public String toString() { ",replicaIndex=" + replicaIndex : "") + ", isEmpty=" + isEmpty + + ", dataChecksum=" + dataChecksum + '}'; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java index 1c838a85f88e..8c63ec5caf87 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java @@ -158,6 +158,9 @@ public void testReplicasAreReportedForClosedContainerAfterRestart() // Ensure 3 replicas are reported successfully as expected. GenericTestUtils.waitFor(() -> getContainerReplicas(newContainer).size() == 3, 200, 30000); + for (ContainerReplica replica : getContainerReplicas(newContainer)) { + assertNotEquals(0, replica.getDataChecksum()); + } } /** @@ -198,6 +201,10 @@ public void testCloseClosedContainer() assertTrue(containerChecksumFileExists(hddsDatanode, container)); } + for (ContainerReplica replica : getContainerReplicas(container)) { + assertNotEquals(0, replica.getDataChecksum()); + } + assertThrows(IOException.class, () -> cluster.getStorageContainerLocationClient() .closeContainer(container.getContainerID()), @@ -269,6 +276,12 @@ public void testContainerChecksumForClosedContainer() throws Exception { assertNotEquals(prevExpectedChecksumInfo1.getContainerID(), prevExpectedChecksumInfo2.getContainerID()); assertNotEquals(prevExpectedChecksumInfo1.getContainerMerkleTree().getDataChecksum(), prevExpectedChecksumInfo2.getContainerMerkleTree().getDataChecksum()); + for (ContainerReplica replica : getContainerReplicas(containerInfo1)) { + assertNotEquals(0, replica.getDataChecksum()); + } + for (ContainerReplica replica : getContainerReplicas(containerInfo2)) { + assertNotEquals(0, replica.getDataChecksum()); + } } private boolean checkContainerCloseInDatanode(HddsDatanodeService hddsDatanode, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java index f51dbfed43af..4624cc562ff5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,7 +77,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -84,6 +87,7 @@ import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; @@ -110,7 +114,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -120,7 +123,7 @@ */ public class TestContainerCommandReconciliation { - private static MiniOzoneCluster cluster; + private static MiniOzoneHAClusterImpl cluster; private static OzoneClient rpcClient; private static ObjectStore store; private static OzoneConfiguration conf; @@ -146,7 +149,9 @@ public static void init() throws Exception { conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024, StorageUnit.BYTES); conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 512 * 1024, StorageUnit.BYTES); // Disable the container scanner so it does not create merkle tree files that interfere with this test. + // TODO: Currently container scrub sets the checksum to 0, Revert this after HDDS-10374 is merged. conf.getObject(ContainerScannerConfiguration.class).setEnabled(false); + conf.setBoolean("hdds.container.scrub.enabled", false); startMiniKdc(); setSecureConfig(); @@ -343,7 +348,7 @@ public void testContainerChecksumWithBlockMissing() throws Exception { .getContainerReplicas(ContainerID.valueOf(containerID)) .stream().map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); - Assertions.assertEquals(3, dataNodeDetails.size()); + assertEquals(3, dataNodeDetails.size()); HddsDatanodeService hddsDatanodeService = cluster.getHddsDatanode(dataNodeDetails.get(0)); DatanodeStateMachine datanodeStateMachine = hddsDatanodeService.getDatanodeStateMachine(); Container container = datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID); @@ -378,7 +383,7 @@ public void testContainerChecksumWithBlockMissing() throws Exception { readChecksumFile(container.getContainerData()); long dataChecksumAfterBlockDelete = containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum(); // Checksum should have changed after block delete. - Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete); + assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete); // Since the container is already closed, we have manually updated the container checksum file. // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. @@ -409,7 +414,7 @@ public void testContainerChecksumChunkCorruption() throws Exception { .getContainerReplicas(ContainerID.valueOf(containerID)) .stream().map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); - Assertions.assertEquals(3, dataNodeDetails.size()); + assertEquals(3, dataNodeDetails.size()); HddsDatanodeService hddsDatanodeService = cluster.getHddsDatanode(dataNodeDetails.get(0)); DatanodeStateMachine datanodeStateMachine = hddsDatanodeService.getDatanodeStateMachine(); Container container = datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID); @@ -463,11 +468,11 @@ public void testContainerChecksumChunkCorruption() throws Exception { long dataChecksumAfterAfterChunkCorruption = containerChecksumAfterChunkCorruption .getContainerMerkleTree().getDataChecksum(); // Checksum should have changed after chunk corruption. - Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterAfterChunkCorruption); + assertNotEquals(oldDataChecksum, dataChecksumAfterAfterChunkCorruption); // 3. Set Unhealthy for first chunk of all blocks. This should be done by the scanner, Until then this is a // manual step. - // // TODO: Use On-demand container scanner to build the new container merkle tree (HDDS-10374) + // TODO: Use On-demand container scanner to build the new container merkle tree (HDDS-10374) Random random = new Random(); ContainerProtos.ContainerChecksumInfo.Builder builder = containerChecksumAfterChunkCorruption.toBuilder(); List blockMerkleTreeList = builder.getContainerMerkleTree() @@ -498,7 +503,97 @@ public void testContainerChecksumChunkCorruption() throws Exception { ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = readChecksumFile(container.getContainerData()); assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(), newContainerChecksumInfo.getContainerMerkleTree()); - Assertions.assertEquals(oldDataChecksum, newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum()); + assertEquals(oldDataChecksum, newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum()); + TestHelper.validateData(KEY_NAME, data, store, volume, bucket); + } + + @Test + public void testDataChecksumReportedAtSCM() throws Exception { + // 1. Write data to a container. + // Read the key back and check its hash. + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + Pair containerAndData = getDataAndContainer(true, 20 * 1024 * 1024, volume, bucket); + long containerID = containerAndData.getLeft(); + byte[] data = containerAndData.getRight(); + // Get the datanodes where the container replicas are stored. + List dataNodeDetails = cluster.getStorageContainerManager().getContainerManager() + .getContainerReplicas(ContainerID.valueOf(containerID)) + .stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + assertEquals(3, dataNodeDetails.size()); + HddsDatanodeService hddsDatanodeService = cluster.getHddsDatanode(dataNodeDetails.get(0)); + DatanodeStateMachine datanodeStateMachine = hddsDatanodeService.getDatanodeStateMachine(); + Container container = datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID); + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = readChecksumFile(container.getContainerData()); + KeyValueHandler kvHandler = (KeyValueHandler) datanodeStateMachine.getContainer().getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + + long oldDataChecksum = oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum(); + // Check non-zero checksum after container close + StorageContainerLocationProtocolClientSideTranslatorPB scmClient = cluster.getStorageContainerLocationClient(); + List containerReplicas = scmClient.getContainerReplicas(containerID, + ClientVersion.CURRENT_VERSION); + assertEquals(3, containerReplicas.size()); + for (HddsProtos.SCMContainerReplicaProto containerReplica: containerReplicas) { + assertNotEquals(0, containerReplica.getDataChecksum()); + } + + // 2. Delete some blocks to simulate missing blocks. + BlockManager blockManager = kvHandler.getBlockManager(); + List blockDataList = blockManager.listBlock(container, -1, 100); + String chunksPath = container.getContainerData().getChunksPath(); + try (DBHandle db = BlockUtils.getDB(containerData, conf); + BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { + for (int i = 0; i < blockDataList.size(); i += 2) { + BlockData blockData = blockDataList.get(i); + // Delete the block metadata from the container db + db.getStore().getBlockDataTable().deleteWithBatch(op, containerData.getBlockKey(blockData.getLocalID())); + // Delete the block file. + Files.deleteIfExists(Paths.get(chunksPath + "/" + blockData.getBlockID().getLocalID() + ".block")); + } + db.getStore().getBatchHandler().commitBatchOperation(op); + db.getStore().flushDB(); + } + + // TODO: Use On-demand container scanner to build the new container merkle tree. (HDDS-10374) + Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath()); + kvHandler.createContainerMerkleTree(container); + ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete = + readChecksumFile(container.getContainerData()); + long dataChecksumAfterBlockDelete = containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum(); + // Checksum should have changed after block delete. + assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete); + + // Since the container is already closed, we have manually updated the container checksum file. + // This doesn't update the checksum reported to SCM, and we need to trigger an ICR. + // Marking a container unhealthy will send an ICR. + kvHandler.markContainerUnhealthy(container, MetadataScanResult.deleted()); + waitForDataChecksumsAtSCM(containerID, 2); + scmClient.reconcileContainer(containerID); + + waitForDataChecksumsAtSCM(containerID, 1); + // Check non-zero checksum after container reconciliation + containerReplicas = scmClient.getContainerReplicas(containerID, ClientVersion.CURRENT_VERSION); + assertEquals(3, containerReplicas.size()); + for (HddsProtos.SCMContainerReplicaProto containerReplica: containerReplicas) { + assertNotEquals(0, containerReplica.getDataChecksum()); + } + + // Check non-zero checksum after datanode restart + // Restarting all the nodes take more time in mini ozone cluster, so restarting only one node + cluster.restartHddsDatanode(0, true); + for (StorageContainerManager scm : cluster.getStorageContainerManagers()) { + cluster.restartStorageContainerManager(scm, false); + } + cluster.waitForClusterToBeReady(); + waitForDataChecksumsAtSCM(containerID, 1); + containerReplicas = scmClient.getContainerReplicas(containerID, ClientVersion.CURRENT_VERSION); + assertEquals(3, containerReplicas.size()); + for (HddsProtos.SCMContainerReplicaProto containerReplica: containerReplicas) { + assertNotEquals(0, containerReplica.getDataChecksum()); + } TestHelper.validateData(KEY_NAME, data, store, volume, bucket); } @@ -622,7 +717,6 @@ private static void startCluster() throws Exception { .setSCMServiceId("SecureSCM") .setNumOfStorageContainerManagers(3) .setNumOfOzoneManagers(1) - .setNumDatanodes(3) .build(); cluster.waitForClusterToBeReady(); rpcClient = OzoneClientFactory.getRpcClient(conf);