diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java index 9a17d1fc711..5652db5d71a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java @@ -174,7 +174,7 @@ ContainerReplicaProto getContainerReport() * @return true if the integrity checks pass * Scan the container metadata to detect corruption. */ - boolean scanMetaData(); + boolean scanMetaData() throws InterruptedException; /** * Return if the container data should be checksum verified to detect @@ -193,6 +193,8 @@ ContainerReplicaProto getContainerReport() * I/O bandwidth throttling (e.g. for shutdown purpose). * @return true if the checksum verification succeeds * false otherwise + * @throws InterruptedException if the scan is interrupted. */ - boolean scanData(DataTransferThrottler throttler, Canceler canceler); + boolean scanData(DataTransferThrottler throttler, Canceler canceler) + throws InterruptedException; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java index 863f50de522..f4a5d0b9ad0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java @@ -39,12 +39,10 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.util.ShutdownHookManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,13 +83,13 @@ public class MutableVolumeSet implements VolumeSet { private final String datanodeUuid; private String clusterID; - private Runnable shutdownHook; private final StorageVolumeChecker volumeChecker; private Runnable failedVolumeListener; private StateContext context; private final StorageVolumeFactory volumeFactory; private final StorageVolume.VolumeType volumeType; private int maxVolumeFailuresTolerated; + private boolean initialized; public MutableVolumeSet(String dnUuid, ConfigurationSource conf, StateContext context, StorageVolume.VolumeType volumeType, @@ -103,6 +101,7 @@ public MutableVolumeSet(String dnUuid, String clusterID, ConfigurationSource conf, StateContext context, StorageVolume.VolumeType volumeType, StorageVolumeChecker volumeChecker ) throws IOException { + this.initialized = false; this.context = context; this.datanodeUuid = dnUuid; this.clusterID = clusterID; @@ -201,10 +200,7 @@ private void initializeVolumeSet() throws IOException { } checkAllVolumes(); - - // Ensure volume threads are stopped and scm df is saved during shutdown. - ShutdownHookManager.get().addShutdownHook(this::shutdown, - SHUTDOWN_HOOK_PRIORITY); + initialized = true; } /** @@ -258,7 +254,7 @@ private void handleVolumeFailures( // check failed volume tolerated if (!hasEnoughVolumes()) { // on startup, we could not try to stop uninitialized services - if (shutdownHook == null) { + if (!initialized) { throw new IOException("Don't have enough good volumes on startup," + " bad volumes detected: " + failedVolumes.size() + " max tolerated: " + maxVolumeFailuresTolerated); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 702a9315ec4..85e356388d8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -878,7 +878,7 @@ public boolean shouldScanMetadata() { } @Override - public boolean scanMetaData() { + public boolean scanMetaData() throws InterruptedException { long containerId = containerData.getContainerID(); KeyValueContainerCheck checker = new KeyValueContainerCheck(containerData.getMetadataPath(), config, @@ -900,7 +900,8 @@ public boolean shouldScanData() { } @Override - public boolean scanData(DataTransferThrottler throttler, Canceler canceler) { + public boolean scanData(DataTransferThrottler throttler, Canceler canceler) + throws InterruptedException { if (!shouldScanData()) { throw new IllegalStateException("The checksum verification can not be" + " done for container in state " diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java index 2a83a3892ec..0425ed31b79 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java @@ -87,7 +87,7 @@ public KeyValueContainerCheck(String metadataPath, ConfigurationSource conf, * * @return true : integrity checks pass, false : otherwise. */ - public boolean fastCheck() { + public boolean fastCheck() throws InterruptedException { LOG.debug("Running basic checks for container {};", containerID); boolean valid = false; try { @@ -97,6 +97,10 @@ public boolean fastCheck() { valid = true; } catch (IOException e) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Metadata scan of container " + + containerID + " interrupted."); + } handleCorruption(e); } @@ -114,7 +118,8 @@ public boolean fastCheck() { * * @return true : integrity checks pass, false : otherwise. */ - public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) { + public boolean fullCheck(DataTransferThrottler throttler, + Canceler canceler) throws InterruptedException { boolean valid; try { @@ -123,6 +128,10 @@ public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) { scanData(throttler, canceler); } } catch (IOException e) { + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Data scan of container " + containerID + + " interrupted."); + } handleCorruption(e); valid = false; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java index c956e9a0324..139952d2123 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/AbstractBackgroundContainerScanner.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Base class for scheduled scanners on a Datanode. @@ -37,15 +38,12 @@ public abstract class AbstractBackgroundContainerScanner extends Thread { private final long dataScanInterval; - /** - * True if the thread is stopping.

- * Protected by this object's lock. - */ - private volatile boolean stopping = false; + private final AtomicBoolean stopping; public AbstractBackgroundContainerScanner(String name, long dataScanInterval) { this.dataScanInterval = dataScanInterval; + this.stopping = new AtomicBoolean(false); setName(name); setDaemon(true); } @@ -54,7 +52,7 @@ public AbstractBackgroundContainerScanner(String name, public final void run() { AbstractContainerScannerMetrics metrics = getMetrics(); try { - while (!stopping) { + while (!stopping.get()) { runIteration(); metrics.resetNumContainersScanned(); metrics.resetNumUnhealthyContainers(); @@ -74,7 +72,7 @@ public final void runIteration() { long startTime = System.nanoTime(); scanContainers(); long totalDuration = System.nanoTime() - startTime; - if (stopping) { + if (stopping.get()) { return; } AbstractContainerScannerMetrics metrics = getMetrics(); @@ -94,10 +92,12 @@ public final void runIteration() { public final void scanContainers() { Iterator> itr = getContainerIterator(); - while (!stopping && itr.hasNext()) { + while (!stopping.get() && itr.hasNext()) { Container c = itr.next(); try { scanContainer(c); + } catch (InterruptedException ex) { + stopping.set(true); } catch (IOException ex) { LOG.warn("Unexpected exception while scanning container " + c.getContainerData().getContainerID(), ex); @@ -107,28 +107,35 @@ public final void scanContainers() { public abstract Iterator> getContainerIterator(); - public abstract void scanContainer(Container c) throws IOException; + public abstract void scanContainer(Container c) + throws IOException, InterruptedException; public final void handleRemainingSleep(long remainingSleep) { if (remainingSleep > 0) { try { Thread.sleep(remainingSleep); } catch (InterruptedException ignored) { - this.stopping = true; + stopping.set(true); LOG.warn("Background container scan was interrupted."); Thread.currentThread().interrupt(); } } } + /** + * Shutdown the current container scanning thread. + * If the thread is already being shutdown, the call will block until the + * shutdown completes. + */ public synchronized void shutdown() { - this.stopping = true; - this.interrupt(); - try { - this.join(); - } catch (InterruptedException ex) { - LOG.warn("Unexpected exception while stopping data scanner.", ex); - Thread.currentThread().interrupt(); + if (stopping.compareAndSet(false, true)) { + this.interrupt(); + try { + this.join(); + } catch (InterruptedException ex) { + LOG.warn("Unexpected exception while stopping data scanner.", ex); + Thread.currentThread().interrupt(); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java index 73539840a14..39bb930386a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerDataScanner.java @@ -69,7 +69,8 @@ private boolean shouldScan(Container container) { } @Override - public void scanContainer(Container c) throws IOException { + public void scanContainer(Container c) + throws IOException, InterruptedException { // There is one background container data scanner per volume. // If the volume fails, its scanning thread should terminate. if (volume.isFailed()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java index ee67b68d3d1..75f64883010 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/BackgroundContainerMetadataScanner.java @@ -57,7 +57,8 @@ public Iterator> getContainerIterator() { @VisibleForTesting @Override - public void scanContainer(Container container) throws IOException { + public void scanContainer(Container container) + throws IOException, InterruptedException { // There is one background container metadata scanner per datanode. // If this container's volume has failed, skip the container. // The iterator returned by getContainerIterator may have stale results. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java index 460ad507367..4f0aa042a23 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java @@ -141,6 +141,10 @@ private static void performOnDemandScan(Container container) { } catch (IOException e) { LOG.warn("Unexpected exception while scanning container " + containerId, e); + } catch (InterruptedException ex) { + // This should only happen as part of shutdown, which will stop the + // ExecutorService. + LOG.info("On demand container scan interrupted."); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java index 513197b10b8..a8fc6e42504 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java @@ -178,11 +178,17 @@ public static void setupMockContainer( when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement()); when(c.getContainerData()).thenReturn(data); when(c.shouldScanData()).thenReturn(shouldScanData); - when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class))) - .thenReturn(scanDataSuccess); when(c.shouldScanMetadata()).thenReturn(true); - Mockito.lenient().when(c.scanMetaData()).thenReturn(scanMetaDataSuccess); when(c.getContainerData().getVolume()).thenReturn(vol); + + try { + when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class))) + .thenReturn(scanDataSuccess); + Mockito.lenient().when(c.scanMetaData()).thenReturn(scanMetaDataSuccess); + } catch (InterruptedException ex) { + // Mockito.when invocations will not throw this exception. It is just + // required for compilation. + } } public static KeyValueContainer addContainerToDeletedDir( diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java index eb644d03503..134ec050e2a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java @@ -31,13 +31,17 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -62,7 +66,7 @@ public void setup() { @Test @Override - public void testRecentlyScannedContainerIsSkipped() { + public void testRecentlyScannedContainerIsSkipped() throws Exception { setScannedTimestampRecent(healthy); scanner.runIteration(); Mockito.verify(healthy, never()).scanData(any(), any()); @@ -70,7 +74,7 @@ public void testRecentlyScannedContainerIsSkipped() { @Test @Override - public void testPreviouslyScannedContainerIsScanned() { + public void testPreviouslyScannedContainerIsScanned() throws Exception { // If the last scan time is before than the configured gap, the container // should be scanned. setScannedTimestampOld(healthy); @@ -80,7 +84,7 @@ public void testPreviouslyScannedContainerIsScanned() { @Test @Override - public void testUnscannedContainerIsScanned() { + public void testUnscannedContainerIsScanned() throws Exception { // If there is no last scanned time, the container should be scanned. Mockito.when(healthy.getContainerData().lastDataScanTime()) .thenReturn(Optional.empty()); @@ -209,4 +213,27 @@ public void testWithVolumeFailure() throws Exception { Mockito.verify(corruptData, never()).scanData(any(), any()); Mockito.verify(openCorruptMetadata, never()).scanData(any(), any()); } + + + @Test + @Override + public void testShutdownDuringScan() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + // Make the data scan block until interrupt. + Mockito.when(healthy.scanData(any(), any())).then(i -> { + latch.countDown(); + Thread.sleep(Duration.ofDays(1).toMillis()); + return null; + }); + + scanner.start(); + // Wait for the scanner to reach the healthy container. + assertTrue(latch.await(5, TimeUnit.SECONDS)); + // Terminate the scanner while it is blocked scanning the healthy container. + scanner.shutdown(); + // The container should remain healthy. + verifyContainerMarkedUnhealthy(healthy, never()); + + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java index 712e89fde4d..ff542d667a1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerMetadataScanner.java @@ -31,7 +31,10 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -62,7 +65,7 @@ public void setup() { @Test @Override - public void testRecentlyScannedContainerIsSkipped() { + public void testRecentlyScannedContainerIsSkipped() throws Exception { // If the last scan time is before than the configured gap, the container // should be scanned. setScannedTimestampRecent(healthy); @@ -72,7 +75,7 @@ public void testRecentlyScannedContainerIsSkipped() { @Test @Override - public void testPreviouslyScannedContainerIsScanned() { + public void testPreviouslyScannedContainerIsScanned() throws Exception { setScannedTimestampOld(healthy); scanner.runIteration(); Mockito.verify(healthy, atLeastOnce()).scanMetaData(); @@ -200,4 +203,25 @@ public void testWithVolumeFailure() throws Exception { Mockito.verify(corruptData, never()).scanMetaData(); Mockito.verify(openCorruptMetadata, never()).scanMetaData(); } + + @Test + @Override + public void testShutdownDuringScan() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + // Make the metadata scan block until interrupt. + Mockito.when(healthy.scanMetaData()).then(i -> { + latch.countDown(); + Thread.sleep(Duration.ofDays(1).toMillis()); + return null; + }); + + scanner.start(); + // Wait for the scanner to reach the healthy container. + assertTrue(latch.await(5, TimeUnit.SECONDS)); + // Terminate the scanner while it is blocked scanning the healthy container. + scanner.shutdown(); + // The container should remain healthy. + verifyContainerMarkedUnhealthy(healthy, never()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java index 504d4304f1f..1298eb8cad1 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannersAbstract.java @@ -110,6 +110,9 @@ public abstract void testPreviouslyScannedContainerIsScanned() @Test public abstract void testWithVolumeFailure() throws Exception; + @Test + public abstract void testShutdownDuringScan() throws Exception; + @Test public abstract void testUnhealthyContainerNotRescanned() throws Exception; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java index e89cecfae08..50fb1355e2c 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java @@ -33,6 +33,7 @@ import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; +import java.time.Duration; import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -222,6 +223,24 @@ public void testWithVolumeFailure() throws Exception { assertEquals(0, metrics.getNumUnHealthyContainers()); } + @Test + @Override + public void testShutdownDuringScan() throws Exception { + // Make the on demand scan block until interrupt. + Mockito.when(healthy.scanData(any(), any())).then(i -> { + Thread.sleep(Duration.ofDays(1).toMillis()); return null; + }); + + // Start the blocking scan. + OnDemandContainerDataScanner.init(conf, controller); + OnDemandContainerDataScanner.scanContainer(healthy); + // Shut down the on demand scanner. This will interrupt the blocked scan + // on the healthy container. + OnDemandContainerDataScanner.shutdown(); + // Interrupting the healthy container's scan should not mark it unhealthy. + verifyContainerMarkedUnhealthy(healthy, never()); + } + @Test @Override public void testUnhealthyContainerNotRescanned() throws Exception { @@ -258,8 +277,7 @@ public void testUnhealthyContainerNotRescanned() throws Exception { assertEquals(0, metrics.getNumUnHealthyContainers()); } - private void scanContainer(Container container) - throws Exception { + private void scanContainer(Container container) throws Exception { OnDemandContainerDataScanner.init(conf, controller); Optional> scanFuture = OnDemandContainerDataScanner.scanContainer(container);