Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ ContainerReplicaProto getContainerReport()
* @return true if the integrity checks pass
* Scan the container metadata to detect corruption.
*/
boolean scanMetaData();
boolean scanMetaData() throws InterruptedException;

/**
* Return if the container data should be checksum verified to detect
Expand All @@ -193,6 +193,8 @@ ContainerReplicaProto getContainerReport()
* I/O bandwidth throttling (e.g. for shutdown purpose).
* @return true if the checksum verification succeeds
* false otherwise
* @throws InterruptedException if the scan is interrupted.
*/
boolean scanData(DataTransferThrottler throttler, Canceler canceler);
boolean scanData(DataTransferThrottler throttler, Canceler canceler)
throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ShutdownHookManager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,13 +83,13 @@ public class MutableVolumeSet implements VolumeSet {
private final String datanodeUuid;
private String clusterID;

private Runnable shutdownHook;
private final StorageVolumeChecker volumeChecker;
private Runnable failedVolumeListener;
private StateContext context;
private final StorageVolumeFactory volumeFactory;
private final StorageVolume.VolumeType volumeType;
private int maxVolumeFailuresTolerated;
private boolean initialized;

public MutableVolumeSet(String dnUuid, ConfigurationSource conf,
StateContext context, StorageVolume.VolumeType volumeType,
Expand All @@ -103,6 +101,7 @@ public MutableVolumeSet(String dnUuid, String clusterID,
ConfigurationSource conf, StateContext context,
StorageVolume.VolumeType volumeType, StorageVolumeChecker volumeChecker
) throws IOException {
this.initialized = false;
this.context = context;
this.datanodeUuid = dnUuid;
this.clusterID = clusterID;
Expand Down Expand Up @@ -201,10 +200,7 @@ private void initializeVolumeSet() throws IOException {
}

checkAllVolumes();

// Ensure volume threads are stopped and scm df is saved during shutdown.
ShutdownHookManager.get().addShutdownHook(this::shutdown,
SHUTDOWN_HOOK_PRIORITY);
initialized = true;
}

/**
Expand Down Expand Up @@ -258,7 +254,7 @@ private void handleVolumeFailures(
// check failed volume tolerated
if (!hasEnoughVolumes()) {
// on startup, we could not try to stop uninitialized services
if (shutdownHook == null) {
if (!initialized) {
throw new IOException("Don't have enough good volumes on startup,"
+ " bad volumes detected: " + failedVolumes.size()
+ " max tolerated: " + maxVolumeFailuresTolerated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ public boolean shouldScanMetadata() {
}

@Override
public boolean scanMetaData() {
public boolean scanMetaData() throws InterruptedException {
long containerId = containerData.getContainerID();
KeyValueContainerCheck checker =
new KeyValueContainerCheck(containerData.getMetadataPath(), config,
Expand All @@ -900,7 +900,8 @@ public boolean shouldScanData() {
}

@Override
public boolean scanData(DataTransferThrottler throttler, Canceler canceler) {
public boolean scanData(DataTransferThrottler throttler, Canceler canceler)
throws InterruptedException {
if (!shouldScanData()) {
throw new IllegalStateException("The checksum verification can not be" +
" done for container in state "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public KeyValueContainerCheck(String metadataPath, ConfigurationSource conf,
*
* @return true : integrity checks pass, false : otherwise.
*/
public boolean fastCheck() {
public boolean fastCheck() throws InterruptedException {
LOG.debug("Running basic checks for container {};", containerID);
boolean valid = false;
try {
Expand All @@ -97,6 +97,10 @@ public boolean fastCheck() {
valid = true;

} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Metadata scan of container " +
containerID + " interrupted.");
}
handleCorruption(e);
}

Expand All @@ -114,7 +118,8 @@ public boolean fastCheck() {
*
* @return true : integrity checks pass, false : otherwise.
*/
public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) {
public boolean fullCheck(DataTransferThrottler throttler,
Canceler canceler) throws InterruptedException {
boolean valid;

try {
Expand All @@ -123,6 +128,10 @@ public boolean fullCheck(DataTransferThrottler throttler, Canceler canceler) {
scanData(throttler, canceler);
}
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Data scan of container " + containerID +
" interrupted.");
}
handleCorruption(e);
valid = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Base class for scheduled scanners on a Datanode.
Expand All @@ -37,15 +38,12 @@ public abstract class AbstractBackgroundContainerScanner extends Thread {

private final long dataScanInterval;

/**
* True if the thread is stopping.<p/>
* Protected by this object's lock.
*/
private volatile boolean stopping = false;
private final AtomicBoolean stopping;

public AbstractBackgroundContainerScanner(String name,
long dataScanInterval) {
this.dataScanInterval = dataScanInterval;
this.stopping = new AtomicBoolean(false);
setName(name);
setDaemon(true);
}
Expand All @@ -54,7 +52,7 @@ public AbstractBackgroundContainerScanner(String name,
public final void run() {
AbstractContainerScannerMetrics metrics = getMetrics();
try {
while (!stopping) {
while (!stopping.get()) {
runIteration();
metrics.resetNumContainersScanned();
metrics.resetNumUnhealthyContainers();
Expand All @@ -74,7 +72,7 @@ public final void runIteration() {
long startTime = System.nanoTime();
scanContainers();
long totalDuration = System.nanoTime() - startTime;
if (stopping) {
if (stopping.get()) {
return;
}
AbstractContainerScannerMetrics metrics = getMetrics();
Expand All @@ -94,10 +92,12 @@ public final void runIteration() {

public final void scanContainers() {
Iterator<Container<?>> itr = getContainerIterator();
while (!stopping && itr.hasNext()) {
while (!stopping.get() && itr.hasNext()) {
Container<?> c = itr.next();
try {
scanContainer(c);
} catch (InterruptedException ex) {
stopping.set(true);
} catch (IOException ex) {
LOG.warn("Unexpected exception while scanning container "
+ c.getContainerData().getContainerID(), ex);
Expand All @@ -107,28 +107,35 @@ public final void scanContainers() {

public abstract Iterator<Container<?>> getContainerIterator();

public abstract void scanContainer(Container<?> c) throws IOException;
public abstract void scanContainer(Container<?> c)
throws IOException, InterruptedException;

public final void handleRemainingSleep(long remainingSleep) {
if (remainingSleep > 0) {
try {
Thread.sleep(remainingSleep);
} catch (InterruptedException ignored) {
this.stopping = true;
stopping.set(true);
LOG.warn("Background container scan was interrupted.");
Thread.currentThread().interrupt();
}
}
}

/**
* Shutdown the current container scanning thread.
* If the thread is already being shutdown, the call will block until the
* shutdown completes.
*/
public synchronized void shutdown() {
this.stopping = true;
this.interrupt();
try {
this.join();
} catch (InterruptedException ex) {
LOG.warn("Unexpected exception while stopping data scanner.", ex);
Thread.currentThread().interrupt();
if (stopping.compareAndSet(false, true)) {
this.interrupt();
try {
this.join();
} catch (InterruptedException ex) {
LOG.warn("Unexpected exception while stopping data scanner.", ex);
Thread.currentThread().interrupt();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private boolean shouldScan(Container<?> container) {
}

@Override
public void scanContainer(Container<?> c) throws IOException {
public void scanContainer(Container<?> c)
throws IOException, InterruptedException {
// There is one background container data scanner per volume.
// If the volume fails, its scanning thread should terminate.
if (volume.isFailed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public Iterator<Container<?>> getContainerIterator() {

@VisibleForTesting
@Override
public void scanContainer(Container<?> container) throws IOException {
public void scanContainer(Container<?> container)
throws IOException, InterruptedException {
// There is one background container metadata scanner per datanode.
// If this container's volume has failed, skip the container.
// The iterator returned by getContainerIterator may have stale results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ private static void performOnDemandScan(Container<?> container) {
} catch (IOException e) {
LOG.warn("Unexpected exception while scanning container "
+ containerId, e);
} catch (InterruptedException ex) {
// This should only happen as part of shutdown, which will stop the
// ExecutorService.
LOG.info("On demand container scan interrupted.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ public static void setupMockContainer(
when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
when(c.getContainerData()).thenReturn(data);
when(c.shouldScanData()).thenReturn(shouldScanData);
when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(scanDataSuccess);
when(c.shouldScanMetadata()).thenReturn(true);
Mockito.lenient().when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
when(c.getContainerData().getVolume()).thenReturn(vol);

try {
when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(scanDataSuccess);
Mockito.lenient().when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
} catch (InterruptedException ex) {
// Mockito.when invocations will not throw this exception. It is just
// required for compilation.
}
}

public static KeyValueContainer addContainerToDeletedDir(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
Expand All @@ -62,15 +66,15 @@ public void setup() {

@Test
@Override
public void testRecentlyScannedContainerIsSkipped() {
public void testRecentlyScannedContainerIsSkipped() throws Exception {
setScannedTimestampRecent(healthy);
scanner.runIteration();
Mockito.verify(healthy, never()).scanData(any(), any());
}

@Test
@Override
public void testPreviouslyScannedContainerIsScanned() {
public void testPreviouslyScannedContainerIsScanned() throws Exception {
// If the last scan time is before than the configured gap, the container
// should be scanned.
setScannedTimestampOld(healthy);
Expand All @@ -80,7 +84,7 @@ public void testPreviouslyScannedContainerIsScanned() {

@Test
@Override
public void testUnscannedContainerIsScanned() {
public void testUnscannedContainerIsScanned() throws Exception {
// If there is no last scanned time, the container should be scanned.
Mockito.when(healthy.getContainerData().lastDataScanTime())
.thenReturn(Optional.empty());
Expand Down Expand Up @@ -209,4 +213,27 @@ public void testWithVolumeFailure() throws Exception {
Mockito.verify(corruptData, never()).scanData(any(), any());
Mockito.verify(openCorruptMetadata, never()).scanData(any(), any());
}


@Test
@Override
public void testShutdownDuringScan() throws Exception {
CountDownLatch latch = new CountDownLatch(1);

// Make the data scan block until interrupt.
Mockito.when(healthy.scanData(any(), any())).then(i -> {
latch.countDown();
Thread.sleep(Duration.ofDays(1).toMillis());
return null;
});

scanner.start();
// Wait for the scanner to reach the healthy container.
assertTrue(latch.await(5, TimeUnit.SECONDS));
// Terminate the scanner while it is blocked scanning the healthy container.
scanner.shutdown();
// The container should remain healthy.
verifyContainerMarkedUnhealthy(healthy, never());

}
}
Loading