diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index ff6e199db29b..ea47c4945b8c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -402,12 +402,13 @@ && getMissingContainerSet().contains(containerID)) { || containerState == State.RECOVERING); // mark and persist the container state to be unhealthy try { - // TODO HDDS-7096 + HDDS-8781: Use on demand scanning for the open - // container instead. ContainerScanError error = new ContainerScanError(ContainerScanError.FailureType.WRITE_FAILURE, new File(container.getContainerData().getContainerPath()), new StorageContainerException(result)); handler.markContainerUnhealthy(container, DataScanResult.fromErrors(Collections.singletonList(error))); + // For unhealthy containers, trigger an async on-demand scan to build container merkle tree, + // as the metadata-based tree may not be reliable due to potential data corruption. + containerSet.scanContainerWithoutGap(containerID, "Unhealthy container scan"); LOG.info("Marked Container UNHEALTHY, ContainerID: {}", containerID); } catch (IOException ioe) { // just log the error here in case marking the container fails, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index bea4aa0af844..aee346ec8945 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1493,7 +1493,7 @@ private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(Cont public void markContainerUnhealthy(Container container, ScanResult reason) throws IOException { container.writeLock(); - long containerID = 0L; + long containerID = container.getContainerData().getContainerID(); try { if (container.getContainerState() == State.UNHEALTHY) { LOG.debug("Call to mark already unhealthy container {} as unhealthy", diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 91816e97a77e..e0f255d64058 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -472,6 +472,11 @@ public void resumeContainerScrub() { backgroundScanners.forEach(AbstractBackgroundContainerScanner::unpause); } + @VisibleForTesting + public OnDemandContainerScanner getOnDemandScanner() { + return onDemandScanner; + } + /** * Starts serving requests to ozone container. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java index 7910b6908c96..7cbe2cc65bba 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestCloseContainer.java @@ -34,7 +34,6 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.time.Duration; @@ -140,7 +139,7 @@ public void testReplicasAreReportedForClosedContainerAfterRestart() // Checksum file exists after container close for (HddsDatanodeService hddsDatanode: hddsDatanodes) { GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, container), 100, 5000); - assertTrue(containerChecksumFileExists(hddsDatanode, container.getContainerID())); + GenericTestUtils.waitFor(() -> containerChecksumFileExists(hddsDatanode, container.getContainerID()), 100, 5000); } long originalSeq = container.getSequenceId(); @@ -197,7 +196,7 @@ public void testCloseClosedContainer() // Checksum file exists after container close for (HddsDatanodeService hddsDatanode: hddsDatanodes) { GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, container), 100, 5000); - assertTrue(containerChecksumFileExists(hddsDatanode, container.getContainerID())); + GenericTestUtils.waitFor(() -> containerChecksumFileExists(hddsDatanode, container.getContainerID()), 100, 5000); } for (ContainerReplica replica : getContainerReplicas(container)) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java index 086d4b41ef75..f99157e7c9f0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java @@ -224,4 +224,9 @@ private OzoneOutputStream createKey(String keyName) throws Exception { protected OzoneConfiguration getConf() { return cluster.getConf(); } + + protected HddsDatanodeService getDatanode() { + assertEquals(1, cluster.getHddsDatanodes().size()); + return cluster.getHddsDatanodes().get(0); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerScannerIntegration.java index d27b78aa593d..e81b3244a965 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerScannerIntegration.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerScannerIntegration.java @@ -26,20 +26,26 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Instant; import java.util.Collection; import java.util.EnumSet; +import java.util.Optional; import java.util.Set; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner; +import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandScannerMetrics; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -182,4 +188,53 @@ void testCorruptionDetectedForOpenContainers(TestContainerCorruptions corruption corruption.assertLogged(openContainerID, 1, logCapturer); } + /** + * Test that {@link OnDemandContainerScanner} is triggered when the HddsDispatcher + * detects write failures and automatically triggers on-demand scans. + */ + @Test + void testOnDemandScanTriggeredByUnhealthyContainer() throws Exception { + long containerID = writeDataToOpenContainer(); + Container container = getDnContainer(containerID); + assertEquals(State.OPEN, container.getContainerState()); + + Optional initialScanTime = container.getContainerData().lastDataScanTime(); + HddsDatanodeService dn = getDatanode(); + ContainerDispatcher dispatcher = dn.getDatanodeStateMachine().getContainer().getDispatcher(); + OnDemandScannerMetrics scannerMetrics = dn.getDatanodeStateMachine().getContainer() + .getOnDemandScanner().getMetrics(); + int initialScannedCount = scannerMetrics.getNumContainersScanned(); + + // Create a PutBlock request with malformed block data to trigger internal error + ContainerProtos.ContainerCommandRequestProto writeFailureRequest = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.PutBlock) + .setContainerID(containerID) + .setDatanodeUuid(dn.getDatanodeDetails().getUuidString()) + .setPutBlock(ContainerProtos.PutBlockRequestProto.newBuilder() + .setBlockData(ContainerProtos.BlockData.newBuilder() + .setBlockID(ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(containerID) + .setLocalID(999L) + .setBlockCommitSequenceId(1) + .build()) + .setSize(1024) // Size mismatch with chunks + .build()) + .build()) + .build(); + + ContainerProtos.ContainerCommandResponseProto response = dispatcher.dispatch(writeFailureRequest, null); + assertNotEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + assertEquals(State.UNHEALTHY, container.getContainerState()); + + // The dispatcher should have called containerSet.scanContainerWithoutGap due to the failure + GenericTestUtils.waitFor(() -> { + Optional currentScanTime = container.getContainerData().lastDataScanTime(); + return currentScanTime.isPresent() && currentScanTime.get().isAfter(initialScanTime.orElse(Instant.EPOCH)); + }, 500, 5000); + + int finalScannedCount = scannerMetrics.getNumContainersScanned(); + assertTrue(finalScannedCount > initialScannedCount); + } + }