From 33bee608e586a0e2221afe8b34eb3e01d1db7d60 Mon Sep 17 00:00:00 2001 From: tejaskriya Date: Tue, 5 Aug 2025 15:18:51 +0530 Subject: [PATCH 1/6] Add scan for import, export, reconstruction, reconciliation --- .../ECReconstructionCoordinator.java | 2 + .../container/keyvalue/KeyValueHandler.java | 251 +++++++++--------- .../ozoneimpl/ContainerController.java | 10 +- .../replication/ContainerImporter.java | 4 + ...tainerReconciliationWithMockDatanodes.java | 40 ++- .../replication/TestContainerImporter.java | 17 ++ .../scm/storage/TestContainerCommandsEC.java | 9 +- 7 files changed, 199 insertions(+), 134 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 9b869f4a81fa..e0dc9a48001d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -189,6 +189,8 @@ public void reconstructECContainerGroup(long containerID, } metrics.incReconstructionTotal(); metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size()); + // Trigger a container scan after successful reconstruction + context.getParent().getContainer().getContainerSet().scanContainer(containerID); } catch (Exception e) { // Any exception let's delete the recovering containers. metrics.incReconstructionFailsTotal(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index a2dbdcef7f3e..3037ba3059ff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1571,152 +1571,155 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); long containerID = containerData.getContainerID(); - // Obtain the original checksum info before reconciling with any peers. - Optional optionalChecksumInfo = checksumManager.read(containerData); - ContainerProtos.ContainerChecksumInfo originalChecksumInfo; - if (optionalChecksumInfo.isPresent()) { - originalChecksumInfo = optionalChecksumInfo.get(); - } else { - // Try creating the checksum info from RocksDB metadata if it is not present. - originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer); - } - // This holds our current most up-to-date checksum info that we are using for the container. - ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo; - - int successfulPeerCount = 0; - Set allBlocksUpdated = new HashSet<>(); - ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); - - for (DatanodeDetails peer : peers) { - long numMissingBlocksRepaired = 0; - long numCorruptChunksRepaired = 0; - long numMissingChunksRepaired = 0; - // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the - // next peer. - ContainerMerkleTreeWriter updatedTreeWriter = - new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); - - LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", - containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); - // Data checksum updated after each peer reconciles. - long start = Instant.now().toEpochMilli(); - ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( - containerID, peer); - if (peerChecksumInfo == null) { - LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", + try { + // Obtain the original checksum info before reconciling with any peers. + Optional optionalChecksumInfo = checksumManager.read(containerData); + ContainerProtos.ContainerChecksumInfo originalChecksumInfo; + if (optionalChecksumInfo.isPresent()) { + originalChecksumInfo = optionalChecksumInfo.get(); + } else { + // Try creating the checksum info from RocksDB metadata if it is not present. + originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer); + } + // This holds our current most up-to-date checksum info that we are using for the container. + ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo; + + int successfulPeerCount = 0; + Set allBlocksUpdated = new HashSet<>(); + ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); + + for (DatanodeDetails peer : peers) { + long numMissingBlocksRepaired = 0; + long numCorruptChunksRepaired = 0; + long numMissingChunksRepaired = 0; + // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the + // next peer. + ContainerMerkleTreeWriter updatedTreeWriter = + new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); + + LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", + containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); + // Data checksum updated after each peer reconciles. + long start = Instant.now().toEpochMilli(); + ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( containerID, peer); - continue; - } + if (peerChecksumInfo == null) { + LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", + containerID, peer); + continue; + } - ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); - Pipeline pipeline = createSingleNodePipeline(peer); + ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); + Pipeline pipeline = createSingleNodePipeline(peer); + + // Handle missing blocks + for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { + long localID = missingBlock.getBlockID(); + BlockID blockID = new BlockID(containerID, localID); + if (getBlockManager().blockExists(container, blockID)) { + LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + + "present. Our container merkle tree is stale.", localID, containerID); + } else { + try { + long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, + missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); + if (chunksInBlockRetrieved != 0) { + allBlocksUpdated.add(localID); + numMissingBlocksRepaired++; + } + } catch (IOException e) { + LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), + containerID, e); + } + } + } - // Handle missing blocks - for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { - long localID = missingBlock.getBlockID(); - BlockID blockID = new BlockID(containerID, localID); - if (getBlockManager().blockExists(container, blockID)) { - LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + - "present. Our container merkle tree is stale.", localID, containerID); - } else { + // Handle missing chunks + for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { + long localID = entry.getKey(); try { - long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, - missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); - if (chunksInBlockRetrieved != 0) { + long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (missingChunksRepaired != 0) { allBlocksUpdated.add(localID); - numMissingBlocksRepaired++; + numMissingChunksRepaired += missingChunksRepaired; } } catch (IOException e) { - LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), + LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), containerID, e); } } - } - // Handle missing chunks - for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { - long localID = entry.getKey(); - try { - long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (missingChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numMissingChunksRepaired += missingChunksRepaired; + // Handle corrupt chunks + for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { + long localID = entry.getKey(); + try { + long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (corruptChunksRepaired != 0) { + allBlocksUpdated.add(localID); + numCorruptChunksRepaired += corruptChunksRepaired; + } + } catch (IOException e) { + LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), + containerID, e); } - } catch (IOException e) { - LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), - containerID, e); } - } - // Handle corrupt chunks - for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { - long localID = entry.getKey(); - try { - long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (corruptChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numCorruptChunksRepaired += corruptChunksRepaired; + // Based on repaired done with this peer, write the updated merkle tree to the container. + // This updated tree will be used when we reconcile with the next peer. + ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; + latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); + + // Log the results of reconciliation with this peer. + long duration = Instant.now().toEpochMilli() - start; + long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); + long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); + if (previousDataChecksum == latestDataChecksum) { + if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { + // This condition should never happen. + LOG.error("Checksum of container was not updated but blocks were repaired."); } - } catch (IOException e) { - LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), - containerID, e); + LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), duration); + } else { + LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + + ".\nMissing blocks repaired: {}/{}\n" + + "Missing chunks repaired: {}/{}\n" + + "Corrupt chunks repaired: {}/{}\n" + + "Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), + numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), + numMissingChunksRepaired, diffReport.getMissingChunks().size(), + numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), + duration); } - } - // Based on repaired done with this peer, write the updated merkle tree to the container. - // This updated tree will be used when we reconcile with the next peer. - ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; - latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); + ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); + successfulPeerCount++; + } - // Log the results of reconciliation with this peer. - long duration = Instant.now().toEpochMilli() - start; - long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); + // Log a summary after reconciling with all peers. + long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo); long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); - if (previousDataChecksum == latestDataChecksum) { - if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { - // This condition should never happen. - LOG.error("Checksum of container was not updated but blocks were repaired."); - } - LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), duration); + if (originalDataChecksum == latestDataChecksum) { + LOG.info("Completed reconciliation for container {} with {}/{} peers. " + + "Original data checksum {} was not updated", + containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum)); } else { - LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + - ".\nMissing blocks repaired: {}/{}\n" + - "Missing chunks repaired: {}/{}\n" + - "Corrupt chunks repaired: {}/{}\n" + - "Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), - numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), - numMissingChunksRepaired, diffReport.getMissingChunks().size(), - numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), - duration); - } - - ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); - successfulPeerCount++; - } - - // Log a summary after reconciling with all peers. - long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo); - long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); - if (originalDataChecksum == latestDataChecksum) { - LOG.info("Completed reconciliation for container {} with {}/{} peers. Original data checksum {} was not updated", - containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum)); - } else { - LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " + - "updated from {} to {}", containerID, successfulPeerCount, peers.size(), allBlocksUpdated.size(), - checksumToString(originalDataChecksum), checksumToString(latestDataChecksum)); - if (LOG.isDebugEnabled()) { - LOG.debug("Blocks updated in container {} after reconciling with {} peers: {}", containerID, - successfulPeerCount, allBlocksUpdated); + LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " + + "updated from {} to {}", containerID, successfulPeerCount, peers.size(), allBlocksUpdated.size(), + checksumToString(originalDataChecksum), checksumToString(latestDataChecksum)); + if (LOG.isDebugEnabled()) { + LOG.debug("Blocks updated in container {} after reconciling with {} peers: {}", containerID, + successfulPeerCount, allBlocksUpdated); + } } + } finally { + // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. + containerSet.scanContainerWithoutGap(containerID); + sendICR(container); } - - // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. - containerSet.scanContainerWithoutGap(containerID); - sendICR(container); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index 671cf6448bed..8d4dbf08f7e5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -213,8 +213,14 @@ public Container importContainer( public void exportContainer(final ContainerType type, final long containerId, final OutputStream outputStream, final TarContainerPacker packer) throws IOException { - handlers.get(type).exportContainer( - containerSet.getContainer(containerId), outputStream, packer); + try { + handlers.get(type).exportContainer( + containerSet.getContainer(containerId), outputStream, packer); + } catch (IOException e) { + // If export fails, then trigger a scan for the container + containerSet.scanContainer(containerId); + throw e; + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index a0fee26b9b76..a1e31129fb2c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -126,6 +126,10 @@ public void importContainer(long containerID, Path tarFilePath, targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed()); containerSet.addContainerByOverwriteMissingContainer(container); containerSet.scanContainer(containerID); + } catch (Exception e) { + // Trigger a volume scan if the import failed. + StorageVolumeUtil.onFailure(containerData.getVolume()); + throw e; } } finally { importContainerProgress.remove(containerID); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index 7fce76a0e4df..78bf679f2ec7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -88,6 +89,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -211,7 +213,7 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo .map(MockDatanode::getDnDetails) .filter(other -> !current.getDnDetails().equals(other)) .collect(Collectors.toList()); - current.reconcileContainer(dnClient, peers, CONTAINER_ID); + current.reconcileContainerSuccess(dnClient, peers, CONTAINER_ID); } // Reconciliation should have triggered a second on-demand scan for each replica. Wait for them to finish before // checking the results. @@ -221,6 +223,31 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo assertEquals(healthyDataChecksum, repairedDataChecksum); } + @Test + public void testContainerReconciliationFailureContainerScan() + throws Exception { + containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any())) + .thenThrow(new IOException("Simulated failure to get container checksum info")); + + // Use synchronous on-demand scans to re-build the merkle trees after corruption. + datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); + + // Each datanode should have had one on-demand scan during test setup, and a second one after corruption was + // introduced. + waitForExpectedScanCount(1); + + for (MockDatanode current : datanodes) { + List peers = datanodes.stream() + .map(MockDatanode::getDnDetails) + .filter(other -> !current.getDnDetails().equals(other)) + .collect(Collectors.toList()); + // Reconciliation should fail for each datanode, since the checksum info cannot be retrieved. + assertThrows(IOException.class, () -> current.reconcileContainer(dnClient, peers, CONTAINER_ID)); + } + // Even failure of Reconciliation should have triggered a second on-demand scan for each replica. + waitForExpectedScanCount(2); + } + /** * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on * every datanode. @@ -438,16 +465,21 @@ public void resetOnDemandScanCount() { onDemandScanner.getMetrics().resetNumContainersScanned(); } - public void reconcileContainer(DNContainerOperationClient client, Collection peers, + public void reconcileContainerSuccess(DNContainerOperationClient client, Collection peers, long containerID) { - log.info("Beginning reconciliation on this mock datanode"); try { - handler.reconcileContainer(client, containerSet.getContainer(containerID), peers); + reconcileContainer(client, peers, containerID); } catch (IOException ex) { fail("Container reconciliation failed", ex); } } + public void reconcileContainer(DNContainerOperationClient client, Collection peers, + long containerID) throws IOException { + log.info("Beginning reconciliation on this mock datanode"); + handler.reconcileContainer(client, containerSet.getContainer(containerID), peers); + } + /** * Create a container with the specified number of blocks. Block data is human-readable so the block files can be * inspected when debugging the test. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java index 6bf5fb6bc741..01571025ea7a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java @@ -28,7 +28,9 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,6 +58,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; @@ -70,6 +73,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.MockedStatic; /** * Test for {@link ContainerImporter}. @@ -201,6 +205,19 @@ public void testImportContainerTriggersOnDemandScanner() throws Exception { verify(containerSet, atLeastOnce()).scanContainer(containerId); } + @Test + public void testImportContainerFailureTriggersVolumeScan() throws Exception { + HddsVolume targetVolume = mock(HddsVolume.class); + try (MockedStatic mockedStatic = mockStatic(StorageVolumeUtil.class)) { + when(controllerMock.importContainer(any(ContainerData.class), any(), any())).thenThrow(new IOException()); + // import the container + File tarFile = containerTarFile(containerId, containerData); + assertThrows(IOException.class, () -> containerImporter.importContainer(containerId, tarFile.toPath(), + targetVolume, NO_COMPRESSION)); + mockedStatic.verify(() -> StorageVolumeUtil.onFailure(any()), times(1)); + } + } + @Test public void testImportContainerResetsLastScanTime() throws Exception { containerData.setDataScanTimestamp(Time.monotonicNow()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 4b5847f43b84..b93099da9b31 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -369,7 +369,7 @@ public void testOrphanBlock() throws Exception { try (ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(config, certClient, - secretKeyClient, null, + secretKeyClient, cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext(), ECReconstructionMetrics.create(), "")) { // Attempt to reconstruct the container. @@ -682,7 +682,8 @@ private void testECReconstructionCoordinator(List missingIndexes, new XceiverClientManager(config); ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(config, certClient, secretKeyClient, - null, ECReconstructionMetrics.create(), "2")) { + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext(), + ECReconstructionMetrics.create(), "2")) { ECReconstructionMetrics metrics = coordinator.getECReconstructionMetrics(); @@ -915,8 +916,8 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure() assertThrows(IOException.class, () -> { try (ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(config, certClient, - secretKeyClient, - null, ECReconstructionMetrics.create(), "")) { + secretKeyClient, cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext(), + ECReconstructionMetrics.create(), "")) { coordinator.reconstructECContainerGroup(conID, (ECReplicationConfig) containerPipeline.getReplicationConfig(), sourceNodeMap, targetNodeMap); From 4b6625939bd6147f554cb553cbd4c6d7d534e7d7 Mon Sep 17 00:00:00 2001 From: tejaskriya Date: Tue, 5 Aug 2025 16:11:22 +0530 Subject: [PATCH 2/6] move to new method --- .../container/keyvalue/KeyValueHandler.java | 260 +++++++++--------- 1 file changed, 133 insertions(+), 127 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index b03ec8134ee5..9a63c76c6cb6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1564,156 +1564,162 @@ public void deleteContainer(Container container, boolean force) @Override public void reconcileContainer(DNContainerOperationClient dnClient, Container container, Collection peers) throws IOException { + long containerID = container.getContainerData().getContainerID(); + try { + reconcileContainerInternal(dnClient, container, peers); + } finally { + // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. + containerSet.scanContainerWithoutGap(containerID, + "Container reconciliation"); + sendICR(container); + } + } + + private void reconcileContainerInternal(DNContainerOperationClient dnClient, Container container, + Collection peers) throws IOException { KeyValueContainer kvContainer = (KeyValueContainer) container; - KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); + KeyValueContainerData containerData = kvContainer.getContainerData(); long containerID = containerData.getContainerID(); - - try { - // Obtain the original checksum info before reconciling with any peers. - ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData); - if (!originalChecksumInfo.hasContainerMerkleTree()) { - // Try creating the merkle tree from RocksDB metadata if it is not present. - originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer); - } - // This holds our current most up-to-date checksum info that we are using for the container. - ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo; - - int successfulPeerCount = 0; - Set allBlocksUpdated = new HashSet<>(); - ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); - - for (DatanodeDetails peer : peers) { - long numMissingBlocksRepaired = 0; - long numCorruptChunksRepaired = 0; - long numMissingChunksRepaired = 0; - // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the - // next peer. - ContainerMerkleTreeWriter updatedTreeWriter = - new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); - - LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", - containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); - // Data checksum updated after each peer reconciles. - long start = Instant.now().toEpochMilli(); - ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( + // Obtain the original checksum info before reconciling with any peers. + ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData); + if (!originalChecksumInfo.hasContainerMerkleTree()) { + // Try creating the merkle tree from RocksDB metadata if it is not present. + originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer); + } + // This holds our current most up-to-date checksum info that we are using for the container. + ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo; + + int successfulPeerCount = 0; + Set allBlocksUpdated = new HashSet<>(); + ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize); + + for (DatanodeDetails peer : peers) { + long numMissingBlocksRepaired = 0; + long numCorruptChunksRepaired = 0; + long numMissingChunksRepaired = 0; + // This will be updated as we do repairs with this peer, + // then used to write the updated tree for the diff with the + // next peer. + ContainerMerkleTreeWriter updatedTreeWriter = + new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); + + LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}", + containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo))); + // Data checksum updated after each peer reconciles. + long start = Instant.now().toEpochMilli(); + ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo( + containerID, peer); + if (peerChecksumInfo == null) { + LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", containerID, peer); - if (peerChecksumInfo == null) { - LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum", - containerID, peer); - continue; - } + continue; + } - ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); - Pipeline pipeline = createSingleNodePipeline(peer); - - // Handle missing blocks - for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { - long localID = missingBlock.getBlockID(); - BlockID blockID = new BlockID(containerID, localID); - if (getBlockManager().blockExists(container, blockID)) { - LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + - "present. Our container merkle tree is stale.", localID, containerID); - } else { - try { - long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, - missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); - if (chunksInBlockRetrieved != 0) { - allBlocksUpdated.add(localID); - numMissingBlocksRepaired++; - } - } catch (IOException e) { - LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), - containerID, e); - } - } - } + ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo); + Pipeline pipeline = createSingleNodePipeline(peer); - // Handle missing chunks - for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { - long localID = entry.getKey(); + // Handle missing blocks + for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) { + long localID = missingBlock.getBlockID(); + BlockID blockID = new BlockID(containerID, localID); + if (getBlockManager().blockExists(container, blockID)) { + LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " + + "present. Our container merkle tree is stale.", localID, containerID); + } else { try { - long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (missingChunksRepaired != 0) { + long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID, + missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer); + if (chunksInBlockRetrieved != 0) { allBlocksUpdated.add(localID); - numMissingChunksRepaired += missingChunksRepaired; + numMissingBlocksRepaired++; } } catch (IOException e) { - LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), + LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(), containerID, e); } } + } - // Handle corrupt chunks - for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { - long localID = entry.getKey(); - try { - long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), - entry.getValue(), updatedTreeWriter, chunkByteBuffer); - if (corruptChunksRepaired != 0) { - allBlocksUpdated.add(localID); - numCorruptChunksRepaired += corruptChunksRepaired; - } - } catch (IOException e) { - LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), - containerID, e); + // Handle missing chunks + for (Map.Entry> entry : diffReport.getMissingChunks().entrySet()) { + long localID = entry.getKey(); + try { + long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (missingChunksRepaired != 0) { + allBlocksUpdated.add(localID); + numMissingChunksRepaired += missingChunksRepaired; } + } catch (IOException e) { + LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(), + containerID, e); } + } - // Based on repaired done with this peer, write the updated merkle tree to the container. - // This updated tree will be used when we reconcile with the next peer. - ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; - latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); - - // Log the results of reconciliation with this peer. - long duration = Instant.now().toEpochMilli() - start; - long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); - long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); - if (previousDataChecksum == latestDataChecksum) { - if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { - // This condition should never happen. - LOG.error("Checksum of container was not updated but blocks were repaired."); + // Handle corrupt chunks + for (Map.Entry> entry : diffReport.getCorruptChunks().entrySet()) { + long localID = entry.getKey(); + try { + long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), + entry.getValue(), updatedTreeWriter, chunkByteBuffer); + if (corruptChunksRepaired != 0) { + allBlocksUpdated.add(localID); + numCorruptChunksRepaired += corruptChunksRepaired; } - LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), duration); - } else { - LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + - ".\nMissing blocks repaired: {}/{}\n" + - "Missing chunks repaired: {}/{}\n" + - "Corrupt chunks repaired: {}/{}\n" + - "Time taken: {} ms", - containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), - numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), - numMissingChunksRepaired, diffReport.getMissingChunks().size(), - numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), - duration); + } catch (IOException e) { + LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(), + containerID, e); } - - ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); - successfulPeerCount++; } - // Log a summary after reconciling with all peers. - long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo); + // Based on repaired done with this peer, write the updated merkle tree to the container. + // This updated tree will be used when we reconcile with the next peer. + ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo; + latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false); + + // Log the results of reconciliation with this peer. + long duration = Instant.now().toEpochMilli() - start; + long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo); long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); - if (originalDataChecksum == latestDataChecksum) { - LOG.info("Completed reconciliation for container {} with {}/{} peers. " + - "Original data checksum {} was not updated", - containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum)); - } else { - LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " + - "updated from {} to {}", containerID, successfulPeerCount, peers.size(), allBlocksUpdated.size(), - checksumToString(originalDataChecksum), checksumToString(latestDataChecksum)); - if (LOG.isDebugEnabled()) { - LOG.debug("Blocks updated in container {} after reconciling with {} peers: {}", containerID, - successfulPeerCount, allBlocksUpdated); + if (previousDataChecksum == latestDataChecksum) { + if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) { + // This condition should never happen. + LOG.error("Checksum of container was not updated but blocks were repaired."); } + LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), duration); + } else { + LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" + + ".\nMissing blocks repaired: {}/{}\n" + + "Missing chunks repaired: {}/{}\n" + + "Corrupt chunks repaired: {}/{}\n" + + "Time taken: {} ms", + containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum), + numMissingBlocksRepaired, diffReport.getMissingBlocks().size(), + numMissingChunksRepaired, diffReport.getMissingChunks().size(), + numCorruptChunksRepaired, diffReport.getCorruptChunks().size(), + duration); + } + + ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer); + successfulPeerCount++; + } + + // Log a summary after reconciling with all peers. + long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo); + long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); + if (originalDataChecksum == latestDataChecksum) { + LOG.info("Completed reconciliation for container {} with {}/{} peers. " + + "Original data checksum {} was not updated", + containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum)); + } else { + LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " + + "updated from {} to {}", containerID, successfulPeerCount, peers.size(), allBlocksUpdated.size(), + checksumToString(originalDataChecksum), checksumToString(latestDataChecksum)); + if (LOG.isDebugEnabled()) { + LOG.debug("Blocks updated in container {} after reconciling with {} peers: {}", containerID, + successfulPeerCount, allBlocksUpdated); } - } finally { - // Trigger on demand scanner, which will build the merkle tree based on the newly ingested data. - containerSet.scanContainerWithoutGap(containerID, - "Container reconciliation"); - sendICR(container); } } From 039068d378fa493bdcee3e448afe6f241d7f7ec0 Mon Sep 17 00:00:00 2001 From: tejaskriya Date: Tue, 5 Aug 2025 17:13:12 +0530 Subject: [PATCH 3/6] scan reason --- .../ec/reconstruction/ECReconstructionCoordinator.java | 3 ++- .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 8 +++----- .../ozone/container/ozoneimpl/ContainerController.java | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index e0dc9a48001d..f8cb36d648b3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -190,7 +190,8 @@ public void reconstructECContainerGroup(long containerID, metrics.incReconstructionTotal(); metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size()); // Trigger a container scan after successful reconstruction - context.getParent().getContainer().getContainerSet().scanContainer(containerID); + context.getParent().getContainer().getContainerSet().scanContainer(containerID, + "EC reconstruction"); } catch (Exception e) { // Any exception let's delete the recovering containers. metrics.incReconstructionFailsTotal(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 9a63c76c6cb6..29360015b50c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1578,7 +1578,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container private void reconcileContainerInternal(DNContainerOperationClient dnClient, Container container, Collection peers) throws IOException { KeyValueContainer kvContainer = (KeyValueContainer) container; - KeyValueContainerData containerData = kvContainer.getContainerData(); + KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); long containerID = containerData.getContainerID(); // Obtain the original checksum info before reconciling with any peers. ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData); @@ -1597,8 +1597,7 @@ private void reconcileContainerInternal(DNContainerOperationClient dnClient, Con long numMissingBlocksRepaired = 0; long numCorruptChunksRepaired = 0; long numMissingChunksRepaired = 0; - // This will be updated as we do repairs with this peer, - // then used to write the updated tree for the diff with the + // This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the // next peer. ContainerMerkleTreeWriter updatedTreeWriter = new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree()); @@ -1709,8 +1708,7 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo); long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo); if (originalDataChecksum == latestDataChecksum) { - LOG.info("Completed reconciliation for container {} with {}/{} peers. " + - "Original data checksum {} was not updated", + LOG.info("Completed reconciliation for container {} with {}/{} peers. Original data checksum {} was not updated", containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum)); } else { LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index 8d4dbf08f7e5..eab23e5bbd1d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -218,7 +218,7 @@ public void exportContainer(final ContainerType type, containerSet.getContainer(containerId), outputStream, packer); } catch (IOException e) { // If export fails, then trigger a scan for the container - containerSet.scanContainer(containerId); + containerSet.scanContainer(containerId, "Export failed"); throw e; } } From 9553660a9dadefe6accd85a12af84f9bca5fe3cf Mon Sep 17 00:00:00 2001 From: tejaskriya Date: Tue, 5 Aug 2025 17:14:29 +0530 Subject: [PATCH 4/6] scan reason --- .../ec/reconstruction/ECReconstructionCoordinator.java | 3 +-- .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index f8cb36d648b3..1eef59dd4e40 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -190,8 +190,7 @@ public void reconstructECContainerGroup(long containerID, metrics.incReconstructionTotal(); metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size()); // Trigger a container scan after successful reconstruction - context.getParent().getContainer().getContainerSet().scanContainer(containerID, - "EC reconstruction"); + context.getParent().getContainer().getContainerSet().scanContainer(containerID, "EC reconstruction"); } catch (Exception e) { // Any exception let's delete the recovering containers. metrics.incReconstructionFailsTotal(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 29360015b50c..df283e074c8d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1580,6 +1580,7 @@ private void reconcileContainerInternal(DNContainerOperationClient dnClient, Con KeyValueContainer kvContainer = (KeyValueContainer) container; KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); long containerID = containerData.getContainerID(); + // Obtain the original checksum info before reconciling with any peers. ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData); if (!originalChecksumInfo.hasContainerMerkleTree()) { From e3eb407ba075e200b278641f071f37a9635b5e4c Mon Sep 17 00:00:00 2001 From: tejaskriya Date: Mon, 18 Aug 2025 01:47:31 +0530 Subject: [PATCH 5/6] change scan location for EC, add test fixes --- .../container/common/interfaces/Handler.java | 5 ++ .../ECReconstructionCoordinator.java | 2 - .../container/keyvalue/KeyValueHandler.java | 19 +++++- .../container/common/ContainerTestUtils.java | 17 +++++- ...tainerReconciliationWithMockDatanodes.java | 59 +++++++++++-------- .../keyvalue/TestKeyValueHandler.java | 40 +++++++++++++ 6 files changed, 110 insertions(+), 32 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index d998d0f7bd91..913473f7a94a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -103,6 +103,11 @@ public String getDatanodeId() { return datanodeId; } + @VisibleForTesting + public ContainerSet getContainerSet() { + return containerSet; + } + /** * This should be called whenever there is state change. It will trigger * an ICR to SCM. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 1eef59dd4e40..9b869f4a81fa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -189,8 +189,6 @@ public void reconstructECContainerGroup(long containerID, } metrics.incReconstructionTotal(); metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size()); - // Trigger a container scan after successful reconstruction - context.getParent().getContainer().getContainerSet().scanContainer(containerID, "EC reconstruction"); } catch (Exception e) { // Any exception let's delete the recovering containers. metrics.incReconstructionFailsTotal(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 02b89d2e7902..7bd800aa7826 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -607,8 +607,13 @@ ContainerCommandResponseProto handleCloseContainer( return malformedRequest(request); } try { + ContainerProtos.ContainerDataProto.State currentState = kvContainer.getContainerState(); markContainerForClose(kvContainer); closeContainer(kvContainer); + if (currentState == RECOVERING) { + // trigger container scan for recovering containers, i.e., after EC reconstruction + containerSet.scanContainer(kvContainer.getContainerData().getContainerID(), "EC Reconstruction"); + } } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -1592,10 +1597,22 @@ public void deleteContainer(Container container, boolean force) deleteInternal(container, force); } - @SuppressWarnings("checkstyle:MethodLength") @Override public void reconcileContainer(DNContainerOperationClient dnClient, Container container, Collection peers) throws IOException { + long containerID = container.getContainerData().getContainerID(); + try { + reconcileContainerInternal(dnClient, container, peers); + } finally { + // Trigger on demand scanner after reconciliation + containerSet.scanContainerWithoutGap(containerID, + "Container reconciliation"); + } + } + + @SuppressWarnings("checkstyle:MethodLength") + private void reconcileContainerInternal(DNContainerOperationClient dnClient, Container container, + Collection peers) throws IOException { KeyValueContainer kvContainer = (KeyValueContainer) container; KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData(); long containerID = containerData.getContainerID(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java index 626fabfb3343..ca6b918509dd 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java @@ -207,14 +207,19 @@ public static KeyValueContainer getContainer(long containerId, return new KeyValueContainer(kvData, new OzoneConfiguration()); } + public static KeyValueHandler getKeyValueHandler(ConfigurationSource config, + String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { + return getKeyValueHandler(config, datanodeId, contSet, volSet, metrics, new ContainerChecksumTreeManager(config)); + } + /** * Constructs an instance of KeyValueHandler that can be used for testing. * This instance can be used for tests that do not need an ICR sender or {@link ContainerChecksumTreeManager}. */ public static KeyValueHandler getKeyValueHandler(ConfigurationSource config, - String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) { - return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c -> { }, - new ContainerChecksumTreeManager(config)); + String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics, + ContainerChecksumTreeManager checksumTreeManager) { + return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c -> { }, checksumTreeManager); } /** @@ -227,6 +232,12 @@ public static KeyValueHandler getKeyValueHandler(ConfigurationSource config, return getKeyValueHandler(config, datanodeId, contSet, volSet, ContainerMetrics.create(config)); } + public static KeyValueHandler getKeyValueHandler(ConfigurationSource config, + String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerChecksumTreeManager checksumTreeManager) { + return getKeyValueHandler(config, datanodeId, contSet, volSet, ContainerMetrics.create(config), + checksumTreeManager); + } + public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf, ContainerSet contSet, VolumeSet volSet, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index f9854502464e..6260b0784fac 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -35,6 +35,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; @@ -73,6 +75,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; @@ -231,31 +234,6 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo assertEquals(healthyDataChecksum, repairedDataChecksum); } - @Test - public void testContainerReconciliationFailureContainerScan() - throws Exception { - containerProtocolMock.when(() -> ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any())) - .thenThrow(new IOException("Simulated failure to get container checksum info")); - - // Use synchronous on-demand scans to re-build the merkle trees after corruption. - datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); - - // Each datanode should have had one on-demand scan during test setup, and a second one after corruption was - // introduced. - waitForExpectedScanCount(1); - - for (MockDatanode current : datanodes) { - List peers = datanodes.stream() - .map(MockDatanode::getDnDetails) - .filter(other -> !current.getDnDetails().equals(other)) - .collect(Collectors.toList()); - // Reconciliation should fail for each datanode, since the checksum info cannot be retrieved. - assertThrows(IOException.class, () -> current.reconcileContainer(dnClient, peers, CONTAINER_ID)); - } - // Even failure of Reconciliation should have triggered a second on-demand scan for each replica. - waitForExpectedScanCount(2); - } - /** * Enum to represent different failure modes for container protocol calls. */ @@ -332,6 +310,30 @@ public void testContainerReconciliationWithPeerFailure(FailureLocation failureLo mockContainerProtocolCalls(); } + @Test + public void testContainerReconciliationFailureContainerScan() + throws Exception { + // Use synchronous on-demand scans to re-build the merkle trees after corruption. + datanodes.forEach(d -> d.scanContainer(CONTAINER_ID)); + + // Each datanode should have had one on-demand scan during test setup, and a second one after corruption was + // introduced. + waitForExpectedScanCount(1); + + for (MockDatanode current : datanodes) { + doThrow(IOException.class).when(current.getHandler().getChecksumManager()).read(any()); + List peers = datanodes.stream() + .map(MockDatanode::getDnDetails) + .filter(other -> !current.getDnDetails().equals(other)) + .collect(Collectors.toList()); + // Reconciliation should fail for each datanode, since the checksum info cannot be retrieved. + assertThrows(IOException.class, () -> current.reconcileContainer(dnClient, peers, CONTAINER_ID)); + Mockito.reset(current.getHandler().getChecksumManager()); + } + // Even failure of Reconciliation should have triggered a second on-demand scan for each replica. + waitForExpectedScanCount(2); + } + /** * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on * every datanode. @@ -448,7 +450,8 @@ private static class MockDatanode { containerSet = newContainerSet(); MutableVolumeSet volumeSet = createVolumeSet(); - handler = ContainerTestUtils.getKeyValueHandler(conf, dnDetails.getUuidString(), containerSet, volumeSet); + handler = ContainerTestUtils.getKeyValueHandler(conf, dnDetails.getUuidString(), containerSet, volumeSet, + spy(new ContainerChecksumTreeManager(conf))); handler.setClusterID(CLUSTER_ID); ContainerController controller = new ContainerController(containerSet, @@ -463,6 +466,10 @@ public DatanodeDetails getDnDetails() { return dnDetails; } + public KeyValueHandler getHandler() { + return handler; + } + /** * @throws IOException for general IO errors accessing the checksum file * @throws java.io.FileNotFoundException When the checksum file does not exist. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 17efe57e5964..40aa7c3e7f2f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -40,6 +40,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -392,6 +393,45 @@ public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion) "Close container should return Invalid container error"); } + @ContainerLayoutTestInfo.ContainerTest + public void testCloseRecoveringContainerTriggersScan(ContainerLayoutVersion layoutVersion) { + final ContainerSet containerSet = spy(newContainerSet()); + final KeyValueHandler keyValueHandler = new KeyValueHandler(conf, + DATANODE_UUID, containerSet, mock(MutableVolumeSet.class), mock(ContainerMetrics.class), + c -> { }, new ContainerChecksumTreeManager(conf)); + + conf = new OzoneConfiguration(); + KeyValueContainerData kvData = new KeyValueContainerData(DUMMY_CONTAINER_ID, + layoutVersion, + (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), + UUID.randomUUID().toString()); + kvData.setMetadataPath(tempDir.toString()); + kvData.setDbFile(dbFile.toFile()); + KeyValueContainer container = new KeyValueContainer(kvData, conf); + ContainerCommandRequestProto createContainerRequest = + createContainerRequest(DATANODE_UUID, DUMMY_CONTAINER_ID); + keyValueHandler.handleCreateContainer(createContainerRequest, container); + + // Make the container state as invalid. + kvData.setState(State.RECOVERING); + + // Create Close container request + ContainerCommandRequestProto closeContainerRequest = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.CloseContainer) + .setContainerID(DUMMY_CONTAINER_ID) + .setDatanodeUuid(DATANODE_UUID) + .setCloseContainer(ContainerProtos.CloseContainerRequestProto + .getDefaultInstance()) + .build(); + dispatcher.dispatch(closeContainerRequest, null); + + keyValueHandler.handleCloseContainer(closeContainerRequest, container); + + verify(keyValueHandler.getContainerSet(), atLeastOnce()) + .scanContainer(DUMMY_CONTAINER_ID, "EC Reconstruction"); + } + @Test public void testCreateContainerWithFailure() throws Exception { final String testDir = tempDir.toString(); From b5ceaaf49c484b362f071988b6baf99a2ad7684d Mon Sep 17 00:00:00 2001 From: tejaskriya Date: Thu, 21 Aug 2025 19:25:47 +0530 Subject: [PATCH 6/6] change var names and improve mock --- .../ozone/container/common/interfaces/Handler.java | 5 ----- .../hadoop/ozone/container/keyvalue/KeyValueHandler.java | 6 +++--- .../ozone/container/keyvalue/TestKeyValueHandler.java | 6 ++---- .../hadoop/hdds/scm/storage/TestContainerCommandsEC.java | 9 ++++----- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 913473f7a94a..d998d0f7bd91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -103,11 +103,6 @@ public String getDatanodeId() { return datanodeId; } - @VisibleForTesting - public ContainerSet getContainerSet() { - return containerSet; - } - /** * This should be called whenever there is state change. It will trigger * an ICR to SCM. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 7bd800aa7826..327820fa0891 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -607,11 +607,11 @@ ContainerCommandResponseProto handleCloseContainer( return malformedRequest(request); } try { - ContainerProtos.ContainerDataProto.State currentState = kvContainer.getContainerState(); + ContainerProtos.ContainerDataProto.State previousState = kvContainer.getContainerState(); markContainerForClose(kvContainer); closeContainer(kvContainer); - if (currentState == RECOVERING) { - // trigger container scan for recovering containers, i.e., after EC reconstruction + if (previousState == RECOVERING) { + // trigger container scan for recovered containers, i.e., after EC reconstruction containerSet.scanContainer(kvContainer.getContainerData().getContainerID(), "EC Reconstruction"); } } catch (StorageContainerException ex) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 40aa7c3e7f2f..dd69a6cb8b66 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -395,9 +395,8 @@ public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion) @ContainerLayoutTestInfo.ContainerTest public void testCloseRecoveringContainerTriggersScan(ContainerLayoutVersion layoutVersion) { - final ContainerSet containerSet = spy(newContainerSet()); final KeyValueHandler keyValueHandler = new KeyValueHandler(conf, - DATANODE_UUID, containerSet, mock(MutableVolumeSet.class), mock(ContainerMetrics.class), + DATANODE_UUID, mockContainerSet, mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> { }, new ContainerChecksumTreeManager(conf)); conf = new OzoneConfiguration(); @@ -428,8 +427,7 @@ DATANODE_UUID, containerSet, mock(MutableVolumeSet.class), mock(ContainerMetric keyValueHandler.handleCloseContainer(closeContainerRequest, container); - verify(keyValueHandler.getContainerSet(), atLeastOnce()) - .scanContainer(DUMMY_CONTAINER_ID, "EC Reconstruction"); + verify(mockContainerSet, atLeastOnce()).scanContainer(DUMMY_CONTAINER_ID, "EC Reconstruction"); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index b93099da9b31..4b5847f43b84 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -369,7 +369,7 @@ public void testOrphanBlock() throws Exception { try (ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(config, certClient, - secretKeyClient, cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext(), + secretKeyClient, null, ECReconstructionMetrics.create(), "")) { // Attempt to reconstruct the container. @@ -682,8 +682,7 @@ private void testECReconstructionCoordinator(List missingIndexes, new XceiverClientManager(config); ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(config, certClient, secretKeyClient, - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext(), - ECReconstructionMetrics.create(), "2")) { + null, ECReconstructionMetrics.create(), "2")) { ECReconstructionMetrics metrics = coordinator.getECReconstructionMetrics(); @@ -916,8 +915,8 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure() assertThrows(IOException.class, () -> { try (ECReconstructionCoordinator coordinator = new ECReconstructionCoordinator(config, certClient, - secretKeyClient, cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().getContext(), - ECReconstructionMetrics.create(), "")) { + secretKeyClient, + null, ECReconstructionMetrics.create(), "")) { coordinator.reconstructECContainerGroup(conID, (ECReplicationConfig) containerPipeline.getReplicationConfig(), sourceNodeMap, targetNodeMap);