diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java index 5a50b07bb3b4..a4750b5fae01 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java @@ -55,6 +55,7 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.AbstractConstruct; import org.yaml.snakeyaml.constructor.SafeConstructor; +import org.yaml.snakeyaml.error.YAMLException; import org.yaml.snakeyaml.introspector.BeanAccess; import org.yaml.snakeyaml.introspector.Property; import org.yaml.snakeyaml.introspector.PropertyUtils; @@ -167,8 +168,20 @@ public static ContainerData readContainer(InputStream input) Yaml yaml = new Yaml(containerDataConstructor, representer); yaml.setBeanAccess(BeanAccess.FIELD); - containerData = (ContainerData) - yaml.load(input); + try { + containerData = yaml.load(input); + } catch (YAMLException ex) { + // Unchecked exception. Convert to IOException since an error with one + // container file is not fatal for the whole thread or datanode. + throw new IOException(ex); + } + + if (containerData == null) { + // If Yaml#load returned null, then the file is empty. This is valid yaml + // but considered an error in this case since we have lost data about + // the container. + throw new IOException("Failed to load container file. File is empty."); + } return containerData; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java index 599b15ab8fc6..f2b879706b7b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java @@ -42,6 +42,10 @@ public class ContainerScannerConfiguration { // only for log public static final String HDDS_CONTAINER_SCRUB_ENABLED = "hdds.container.scrub.enabled"; + public static final String HDDS_CONTAINER_SCRUB_DEV_DATA_ENABLED = + "hdds.container.scrub.dev.data.enabled"; + public static final String HDDS_CONTAINER_SCRUB_DEV_METADATA_ENABLED = + "hdds.container.scrub.dev.metadata.enabled"; public static final String METADATA_SCAN_INTERVAL_KEY = "hdds.container.scrub.metadata.scan.interval"; public static final String DATA_SCAN_INTERVAL_KEY = @@ -69,9 +73,25 @@ public class ContainerScannerConfiguration { type = ConfigType.BOOLEAN, defaultValue = "false", tags = {ConfigTag.STORAGE}, - description = "Config parameter to enable container scanner.") + description = "Config parameter to enable all container scanners.") private boolean enabled = false; + @Config(key = "dev.data.scan.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "true", + tags = {ConfigTag.STORAGE}, + description = "Can be used to disable the background container data " + + "scanner for developer testing purposes.") + private boolean dataScanEnabled = true; + + @Config(key = "dev.metadata.scan.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "true", + tags = {ConfigTag.STORAGE}, + description = "Can be used to disable the background container metadata" + + " scanner for developer testing purposes.") + private boolean metadataScanEnabled = true; + @Config(key = "metadata.scan.interval", type = ConfigType.TIME, defaultValue = "3h", @@ -163,6 +183,14 @@ public boolean isEnabled() { return enabled; } + public boolean isDataScanEnabled() { + return dataScanEnabled; + } + + public boolean isMetadataScanEnabled() { + return metadataScanEnabled; + } + public void setMetadataScanInterval(long metadataScanInterval) { this.metadataScanInterval = metadataScanInterval; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index dbb3832b9450..69c53b8b3ce9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -329,8 +329,16 @@ private void startContainerScrub() { return; } initOnDemandContainerScanner(c); - initMetadataScanner(c); - initContainerScanner(c); + + // This config is for testing the scanners in isolation. + if (c.isMetadataScanEnabled()) { + initMetadataScanner(c); + } + + // This config is for testing the scanners in isolation. + if (c.isDataScanEnabled()) { + initContainerScanner(c); + } } private void initContainerScanner(ContainerScannerConfiguration c) { diff --git a/hadoop-hdds/container-service/src/test/resources/log4j.properties b/hadoop-hdds/container-service/src/test/resources/log4j.properties index 398786689af3..a1050ce18e77 100644 --- a/hadoop-hdds/container-service/src/test/resources/log4j.properties +++ b/hadoop-hdds/container-service/src/test/resources/log4j.properties @@ -21,3 +21,4 @@ log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n + diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java new file mode 100644 index 000000000000..41c44eebdc1c --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerDataScannerIntegration.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.hadoop.ozone.dn.scanner; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.ozoneimpl.BackgroundContainerDataScanner; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +/** + * Integration tests for the background container data scanner. This scanner + * checks all data and metadata in the container. + */ +@RunWith(Parameterized.class) +public class TestBackgroundContainerDataScannerIntegration + extends TestContainerScannerIntegrationAbstract { + + private final ContainerCorruptions corruption; + + @Parameterized.Parameters(name = "{0}") + public static Collection supportedCorruptionTypes() { + // Background container data scanner should be able to detect all errors. + return ContainerCorruptions.getAllParamsExcept(); + } + + @BeforeClass + public static void init() throws Exception { + OzoneConfiguration ozoneConfig = new OzoneConfiguration(); + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_ENABLED, true); + // Make sure the background metadata scanner does not detect failures + // before the data scanner under test does. + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_DEV_METADATA_ENABLED, + false); + // Make the background data scanner run frequently to reduce test time. + ozoneConfig.setTimeDuration( + ContainerScannerConfiguration.DATA_SCAN_INTERVAL_KEY, + SCAN_INTERVAL.getSeconds(), TimeUnit.SECONDS); + buildCluster(ozoneConfig); + } + + public TestBackgroundContainerDataScannerIntegration( + ContainerCorruptions corruption) { + this.corruption = corruption; + } + + /** + * {@link BackgroundContainerDataScanner} should detect corrupted blocks + * in a closed container without client interaction. + */ + @Test + public void testCorruptionDetected() throws Exception { + long containerID = writeDataThenCloseContainer(); + // Container corruption has not yet been introduced. + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + getDnContainer(containerID).getContainerState()); + + corruption.applyTo(getDnContainer(containerID)); + // Wait for the scanner to detect corruption. + GenericTestUtils.waitFor(() -> + getDnContainer(containerID).getContainerState() == + ContainerProtos.ContainerDataProto.State.UNHEALTHY, + 500, 5000); + + // Wait for SCM to get a report of the unhealthy replica. + waitForScmToSeeUnhealthyReplica(containerID); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java new file mode 100644 index 000000000000..5dadae316f48 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestBackgroundContainerMetadataScannerIntegration.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.hadoop.ozone.dn.scanner; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.ozone.container.ozoneimpl.BackgroundContainerMetadataScanner; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +/** + * Integration tests for the background container metadata scanner. This + * scanner does a quick check of container metadata to find obvious failures + * faster than a full data scan. + */ +@RunWith(Parameterized.class) +public class TestBackgroundContainerMetadataScannerIntegration + extends TestContainerScannerIntegrationAbstract { + + private final ContainerCorruptions corruption; + + @Parameterized.Parameters(name = "{0}") + public static Collection supportedCorruptionTypes() { + return ContainerCorruptions.getAllParamsExcept( + ContainerCorruptions.MISSING_BLOCK, + ContainerCorruptions.CORRUPT_BLOCK, + ContainerCorruptions.TRUNCATED_BLOCK); + } + + @BeforeClass + public static void init() throws Exception { + OzoneConfiguration ozoneConfig = new OzoneConfiguration(); + // Speed up SCM closing of open container when an unhealthy replica is + // reported. + ReplicationManager.ReplicationManagerConfiguration rmConf = ozoneConfig + .getObject(ReplicationManager.ReplicationManagerConfiguration.class); + rmConf.setInterval(Duration.ofSeconds(1)); + ozoneConfig.setFromObject(rmConf); + + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_ENABLED, true); + // Make sure the background data scanner does not detect failures + // before the metadata scanner under test does. + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_DEV_DATA_ENABLED, + false); + // Make the background metadata scanner run frequently to reduce test time. + ozoneConfig.setTimeDuration( + ContainerScannerConfiguration.METADATA_SCAN_INTERVAL_KEY, + SCAN_INTERVAL.getSeconds(), TimeUnit.SECONDS); + buildCluster(ozoneConfig); + } + + public TestBackgroundContainerMetadataScannerIntegration( + ContainerCorruptions corruption) { + this.corruption = corruption; + } + + /** + * {@link BackgroundContainerMetadataScanner} should detect corrupted metadata + * in open or closed containers without client interaction. + */ + @Test + public void testCorruptionDetected() throws Exception { + // Write data to an open and closed container. + long closedContainerID = writeDataThenCloseContainer(); + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + getDnContainer(closedContainerID).getContainerState()); + long openContainerID = writeDataToOpenContainer(); + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, + getDnContainer(openContainerID).getContainerState()); + + // Corrupt both containers. + corruption.applyTo(getDnContainer(closedContainerID)); + corruption.applyTo(getDnContainer(openContainerID)); + // Wait for the scanner to detect corruption. + GenericTestUtils.waitFor(() -> + getDnContainer(closedContainerID).getContainerState() == + ContainerProtos.ContainerDataProto.State.UNHEALTHY, + 500, 5000); + GenericTestUtils.waitFor(() -> + getDnContainer(openContainerID).getContainerState() == + ContainerProtos.ContainerDataProto.State.UNHEALTHY, + 500, 5000); + + // Wait for SCM to get reports of the unhealthy replicas. + waitForScmToSeeUnhealthyReplica(closedContainerID); + waitForScmToSeeUnhealthyReplica(openContainerID); + // Once the unhealthy replica is reported, the open container's lifecycle + // state in SCM should move to closed. + waitForScmToCloseContainer(openContainerID); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerDataScanners.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerDataScanners.java deleted file mode 100644 index 16519a0001be..000000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerDataScanners.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.hadoop.ozone.dn.scanner; - -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.PlacementPolicy; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; -import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; -import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.ozone.HddsDatanodeService; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.client.ObjectStore; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.OzoneVolume; -import org.apache.hadoop.ozone.client.io.KeyOutputStream; -import org.apache.hadoop.ozone.client.io.OzoneInputStream; -import org.apache.hadoop.ozone.client.io.OzoneOutputStream; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.ozoneimpl.BackgroundContainerMetadataScanner; -import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; -import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - -import java.io.File; -import java.io.IOException; -import java.util.Set; -import java.util.UUID; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; -import static org.apache.hadoop.hdds.client.ReplicationType.RATIS; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; - -/** - * This class tests the data scanner functionality. - */ -public class TestContainerDataScanners { - - /** - * Set a timeout for each test. - */ - @Rule - public Timeout timeout = Timeout.seconds(300); - private static MiniOzoneCluster cluster; - private static OzoneConfiguration ozoneConfig; - private static OzoneClient ozClient = null; - private static ObjectStore store = null; - private static StorageContainerLocationProtocolClientSideTranslatorPB - storageContainerLocationClient; - - @BeforeClass - public static void init() throws Exception { - ozoneConfig = new OzoneConfiguration(); - ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s"); - ozoneConfig.set(ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_ENABLED, - String.valueOf(true)); - ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, - SCMContainerPlacementCapacity.class, PlacementPolicy.class); - ozoneConfig.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, - false); - cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1) - .build(); - cluster.waitForClusterToBeReady(); - cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 30000); - ozClient = OzoneClientFactory.getRpcClient(ozoneConfig); - store = ozClient.getObjectStore(); - storageContainerLocationClient = - cluster.getStorageContainerLocationClient(); - } - - @AfterClass - public static void shutdown() throws IOException { - if (ozClient != null) { - ozClient.close(); - } - if (storageContainerLocationClient != null) { - storageContainerLocationClient.close(); - } - if (cluster != null) { - cluster.shutdown(); - } - } - - //This test performs 2 separate tests because creating - // and running a cluster is expensive. - @Test - public void testScannersMarkContainerUnhealthy() throws Exception { - String volumeName = UUID.randomUUID().toString(); - String bucketName = UUID.randomUUID().toString(); - String value = "sample key value"; - store.createVolume(volumeName); - OzoneVolume volume = store.getVolume(volumeName); - volume.createBucket(bucketName); - OzoneBucket bucket = volume.getBucket(bucketName); - - String keyNameInClosedContainer = "keyNameInClosedContainer"; - OzoneOutputStream key = createKey(volumeName, bucketName, - keyNameInClosedContainer); - // write data more than 1 chunk - int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2); - byte[] data = ContainerTestHelper - .getFixedLengthString(value, sizeLargerThanOneChunk) - .getBytes(UTF_8); - key.write(data); - - Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); - key.flush(); - TestHelper.waitForContainerClose(key, cluster); - key.close(); - String keyNameInOpenContainer = "keyNameInOpenContainer"; - OzoneOutputStream key2 = createKey(volumeName, bucketName, - keyNameInOpenContainer); - key2.write(data); - key2.close(); - // wait for the container report to propagate to SCM - Thread.sleep(5000); - - Assert.assertEquals(1, cluster.getHddsDatanodes().size()); - - HddsDatanodeService dn = cluster.getHddsDatanodes().get(0); - OzoneContainer oc = dn.getDatanodeStateMachine().getContainer(); - ContainerSet containerSet = oc.getContainerSet(); - //Given an open and a closed container - Assert.assertTrue(containerSet.containerCount() > 1); - Container openContainer = getContainerInState(containerSet, OPEN); - Container closedContainer = getContainerInState(containerSet, CLOSED); - - //When deleting their metadata to make them unhealthy and scanning them - deleteChunksDirForContainer(openContainer); - deleteChunksDirForContainer(closedContainer); - - ContainerScannerConfiguration conf = ozoneConfig.getObject( - ContainerScannerConfiguration.class); - BackgroundContainerMetadataScanner sb = - new BackgroundContainerMetadataScanner(conf, oc.getController()); - //Scan the open container and trigger on-demand scan for the closed one - sb.scanContainer(openContainer); - tryReadKeyWithMissingChunksDir(bucket, keyNameInClosedContainer); - // wait for the incremental container report to propagate to SCM - Thread.sleep(5000); - - ContainerManager cm = cluster.getStorageContainerManager() - .getContainerManager(); - ContainerReplica openContainerReplica = getContainerReplica( - cm, openContainer.getContainerData().getContainerID()); - ContainerReplica closedContainerReplica = getContainerReplica( - cm, closedContainer.getContainerData().getContainerID()); - //Then both containers are marked unhealthy - Assert.assertEquals(State.UNHEALTHY, openContainerReplica.getState()); - Assert.assertEquals(State.UNHEALTHY, closedContainerReplica.getState()); - } - - private ContainerReplica getContainerReplica( - ContainerManager cm, long containerId) throws ContainerNotFoundException { - Set containerReplicas = cm.getContainerReplicas( - ContainerID.valueOf( - containerId)); - Assert.assertEquals(1, containerReplicas.size()); - return containerReplicas.iterator().next(); - } - - //ignore the result of the key read because it is expected to fail - @SuppressWarnings("ResultOfMethodCallIgnored") - private void tryReadKeyWithMissingChunksDir( - OzoneBucket bucket, String keyNameInClosedContainer) throws IOException { - try (OzoneInputStream key = bucket.readKey(keyNameInClosedContainer)) { - Assert.assertThrows(IOException.class, key::read); - } - } - - private void deleteChunksDirForContainer(Container container) { - File chunksDir = new File(container.getContainerData().getContainerPath(), - "chunks"); - deleteDirectory(chunksDir); - Assert.assertFalse(chunksDir.exists()); - } - - private Container getContainerInState( - ContainerSet cs, ContainerProtos.ContainerDataProto.State state) { - return cs.getContainerMap().values().stream() - .filter(c -> state == - c.getContainerState()) - .findAny() - .orElseThrow(() -> - new RuntimeException("No Open container found for testing")); - } - - private OzoneOutputStream createKey(String volumeName, String bucketName, - String keyName) throws Exception { - return TestHelper.createKey( - keyName, RATIS, ONE, 0, store, volumeName, bucketName); - } - - void deleteDirectory(File directoryToBeDeleted) { - File[] allContents = directoryToBeDeleted.listFiles(); - if (allContents != null) { - for (File file : allContents) { - deleteDirectory(file); - } - } - directoryToBeDeleted.delete(); - } -} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java new file mode 100644 index 000000000000..bf6389234bdf --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestContainerScannerIntegrationAbstract.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.hadoop.ozone.dn.scanner; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.TestHelper; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.LambdaTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.Timeout; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.client.ReplicationType.RATIS; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; + +/** + * This class tests the data scanner functionality. + */ +public abstract class TestContainerScannerIntegrationAbstract { + + /** + * Set a timeout for each test. + */ + @Rule + public Timeout timeout = Timeout.seconds(300); + private static MiniOzoneCluster cluster; + private static OzoneClient ozClient = null; + private static ObjectStore store = null; + protected static final Duration SCAN_INTERVAL = Duration.ofSeconds(1); + private static String volumeName; + private static String bucketName; + private static OzoneBucket bucket; + + public static void buildCluster(OzoneConfiguration ozoneConfig) + throws Exception { + // Allow SCM to quickly learn about the unhealthy container. + ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s"); + // Speed up corruption detection by allowing scans of the same container to + // run back to back. + ozoneConfig.setTimeDuration( + ContainerScannerConfiguration.CONTAINER_SCAN_MIN_GAP, + 0, TimeUnit.SECONDS); + + // Build a one datanode cluster. + cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 30000); + ozClient = OzoneClientFactory.getRpcClient(ozoneConfig); + store = ozClient.getObjectStore(); + + volumeName = UUID.randomUUID().toString(); + bucketName = UUID.randomUUID().toString(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + bucket = volume.getBucket(bucketName); + } + + @AfterClass + public static void shutdown() throws IOException { + if (ozClient != null) { + ozClient.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + protected void waitForScmToSeeUnhealthyReplica(long containerID) + throws Exception { + ContainerManager scmContainerManager = cluster.getStorageContainerManager() + .getContainerManager(); + LambdaTestUtils.await(5000, 500, + () -> getContainerReplica(scmContainerManager, containerID) + .getState() == State.UNHEALTHY); + } + + protected void waitForScmToCloseContainer(long containerID) throws Exception { + ContainerManager cm = cluster.getStorageContainerManager() + .getContainerManager(); + LambdaTestUtils.await(5000, 500, + () -> cm.getContainer(new ContainerID(containerID)).getState() + != HddsProtos.LifeCycleState.OPEN); + } + + protected Container getDnContainer(long containerID) { + Assert.assertEquals(1, cluster.getHddsDatanodes().size()); + HddsDatanodeService dn = cluster.getHddsDatanodes().get(0); + OzoneContainer oc = dn.getDatanodeStateMachine().getContainer(); + return oc.getContainerSet().getContainer(containerID); + } + + protected long writeDataThenCloseContainer() throws Exception { + return writeDataThenCloseContainer("keyName"); + } + + protected long writeDataThenCloseContainer(String keyName) throws Exception { + OzoneOutputStream key = createKey(keyName); + key.write(getTestData()); + key.flush(); + key.close(); + + long containerID = bucket.getKey(keyName).getOzoneKeyLocations().stream() + .findFirst().get().getContainerID(); + closeContainerAndWait(containerID); + return containerID; + } + + protected void closeContainerAndWait(long containerID) throws Exception { + cluster.getStorageContainerLocationClient().closeContainer(containerID); + + GenericTestUtils.waitFor( + () -> TestHelper.isContainerClosed(cluster, containerID, + cluster.getHddsDatanodes().get(0).getDatanodeDetails()), + 1000, 5000); + } + + protected long writeDataToOpenContainer() throws Exception { + String keyName = "keyName"; + OzoneOutputStream key = createKey(keyName); + key.write(getTestData()); + key.close(); + + return bucket.getKey(keyName).getOzoneKeyLocations().stream() + .findFirst().get().getContainerID(); + } + + protected byte[] getTestData() { + int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2); + return ContainerTestHelper + .getFixedLengthString("sample value", sizeLargerThanOneChunk) + .getBytes(UTF_8); + } + + protected ContainerReplica getContainerReplica( + ContainerManager cm, long containerId) throws ContainerNotFoundException { + Set containerReplicas = cm.getContainerReplicas( + ContainerID.valueOf( + containerId)); + // Only using a single datanode cluster. + Assert.assertEquals(1, containerReplicas.size()); + return containerReplicas.iterator().next(); + } + + //ignore the result of the key read because it is expected to fail + @SuppressWarnings("ResultOfMethodCallIgnored") + protected void readFromCorruptedKey(String keyName) throws IOException { + try (OzoneInputStream key = bucket.readKey(keyName)) { + Assert.assertThrows(IOException.class, key::read); + } + } + + private OzoneOutputStream createKey(String keyName) throws Exception { + return TestHelper.createKey( + keyName, RATIS, ONE, 0, store, volumeName, bucketName); + } + + /** + * Represents a type of container corruption that can be injected into the + * test. + */ + protected enum ContainerCorruptions { + MISSING_CHUNKS_DIR(container -> { + File chunksDir = new File(container.getContainerData().getContainerPath(), + "chunks"); + try { + FileUtils.deleteDirectory(chunksDir); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + Assert.assertFalse(chunksDir.exists()); + }), + + MISSING_METADATA_DIR(container -> { + File metadataDir = + new File(container.getContainerData().getContainerPath(), + "metadata"); + try { + FileUtils.deleteDirectory(metadataDir); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + Assert.assertFalse(metadataDir.exists()); + }), + + MISSING_CONTAINER_FILE(container -> { + File containerFile = container.getContainerFile(); + Assert.assertTrue(containerFile.delete()); + Assert.assertFalse(containerFile.exists()); + }), + + MISSING_CONTAINER_DIR(container -> { + File containerDir = + new File(container.getContainerData().getContainerPath()); + try { + FileUtils.deleteDirectory(containerDir); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + Assert.assertFalse(containerDir.exists()); + }), + + MISSING_BLOCK(container -> { + File chunksDir = new File( + container.getContainerData().getContainerPath(), "chunks"); + for (File blockFile: + chunksDir.listFiles((dir, name) -> name.endsWith(".block"))) { + try { + Files.delete(blockFile.toPath()); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + } + }), + + CORRUPT_CONTAINER_FILE(container -> { + File containerFile = container.getContainerFile(); + corruptFile(containerFile); + }), + + TRUNCATED_CONTAINER_FILE(container -> { + File containerFile = container.getContainerFile(); + truncateFile(containerFile); + }), + + CORRUPT_BLOCK(container -> { + File chunksDir = new File(container.getContainerData().getContainerPath(), + "chunks"); + Optional blockFile = Arrays.stream(Objects.requireNonNull( + chunksDir.listFiles((dir, name) -> name.endsWith(".block")))) + .findFirst(); + Assert.assertTrue(blockFile.isPresent()); + corruptFile(blockFile.get()); + }), + + TRUNCATED_BLOCK(container -> { + File chunksDir = new File(container.getContainerData().getContainerPath(), + "chunks"); + Optional blockFile = Arrays.stream(Objects.requireNonNull( + chunksDir.listFiles((dir, name) -> name.endsWith(".block")))) + .findFirst(); + Assert.assertTrue(blockFile.isPresent()); + truncateFile(blockFile.get()); + }); + + private final Consumer> corruption; + private static final Random RANDOM = new Random(); + + ContainerCorruptions(Consumer> corruption) { + this.corruption = corruption; + } + + public void applyTo(Container container) { + corruption.accept(container); + } + + /** + * Get all container corruption types as parameters for junit 4 + * parameterized tests, except the ones specified. + */ + public static Collection getAllParamsExcept( + ContainerCorruptions... exclude) { + Collection params = new ArrayList<>(); + Set includeSet = + EnumSet.allOf(ContainerCorruptions.class); + Arrays.asList(exclude).forEach(includeSet::remove); + + for (ContainerCorruptions c: values()) { + if (includeSet.contains(c)) { + params.add(new Object[]{c}); + } + } + return params; + } + + /** + * Overwrite the file with random bytes. + */ + private static void corruptFile(File file) { + byte[] corruptedBytes = new byte[(int)file.length()]; + RANDOM.nextBytes(corruptedBytes); + try { + Files.write(file.toPath(), corruptedBytes, + StandardOpenOption.TRUNCATE_EXISTING); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + } + + /** + * Truncate the file to 0 bytes in length. + */ + private static void truncateFile(File file) { + try { + Files.write(file.toPath(), new byte[]{}, + StandardOpenOption.TRUNCATE_EXISTING); + } catch (IOException ex) { + // Fail the test. + throw new UncheckedIOException(ex); + } + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java new file mode 100644 index 000000000000..1b8752b9ea74 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestOnDemandContainerDataScannerIntegration.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.hadoop.ozone.dn.scanner; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * Integration tests for the on demand container data scanner. This scanner + * is triggered when there is an error while a client interacts with a + * container. + */ +@RunWith(Parameterized.class) +public class TestOnDemandContainerDataScannerIntegration + extends TestContainerScannerIntegrationAbstract { + + private final ContainerCorruptions corruption; + + /** + The on-demand container scanner is triggered by errors on the block read + path. Since this may not touch all parts of the container, the scanner is + limited in what errors it can detect: + - The container file is not on the read path, so any errors in this file + will not trigger an on-demand scan. + - With container schema v3 (one RocksDB per volume), RocksDB is not in + the container metadata directory, therefore nothing in this directory is on + the read path. + - Block checksums are verified on the client side. If there is a checksum + error during read, the datanode will not learn about it. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection supportedCorruptionTypes() { + return ContainerCorruptions.getAllParamsExcept( + ContainerCorruptions.MISSING_METADATA_DIR, + ContainerCorruptions.MISSING_CONTAINER_FILE, + ContainerCorruptions.CORRUPT_CONTAINER_FILE, + ContainerCorruptions.TRUNCATED_CONTAINER_FILE, + ContainerCorruptions.CORRUPT_BLOCK, + ContainerCorruptions.TRUNCATED_BLOCK); + } + + @BeforeClass + public static void init() throws Exception { + OzoneConfiguration ozoneConfig = new OzoneConfiguration(); + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_ENABLED, + true); + // Disable both background container scanners to make sure only the + // on-demand scanner is detecting failures. + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_DEV_DATA_ENABLED, + false); + ozoneConfig.setBoolean( + ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_DEV_METADATA_ENABLED, + false); + buildCluster(ozoneConfig); + } + + public TestOnDemandContainerDataScannerIntegration( + ContainerCorruptions corruption) { + this.corruption = corruption; + } + + /** + * {@link OnDemandContainerDataScanner} should detect corrupted blocks + * in a closed container when a client reads from it. + */ + @Test + public void testCorruptionDetected() throws Exception { + String keyName = "testKey"; + long containerID = writeDataThenCloseContainer(keyName); + // Container corruption has not yet been introduced. + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + getDnContainer(containerID).getContainerState()); + // Corrupt the container. + corruption.applyTo(getDnContainer(containerID)); + // This method will check that reading from the corrupted key returns an + // error to the client. + readFromCorruptedKey(keyName); + // Reading from the corrupted key should have triggered an on-demand scan + // of the container, which will detect the corruption. + GenericTestUtils.waitFor(() -> + getDnContainer(containerID).getContainerState() == + ContainerProtos.ContainerDataProto.State.UNHEALTHY, + 500, 5000); + + // Wait for SCM to get a report of the unhealthy replica. + waitForScmToSeeUnhealthyReplica(containerID); + } +}