Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class ContainerSet implements Iterable<Container<?>> {
private Clock clock;
private long recoveringTimeout;
private final Table<Long, String> containerIdsTable;
// Handler that will be invoked when a scan of a container in this set is requested.
private Consumer<Container<?>> containerScanHandler;

@VisibleForTesting
public ContainerSet(long recoveringTimeout) {
Expand Down Expand Up @@ -128,6 +131,29 @@ public void ensureContainerNotMissing(long containerId, State state) throws Stor
}
}

/**
* @param scanner A callback that will be invoked when a scan of a container in this set is requested.
*/
public void registerContainerScanHandler(Consumer<Container<?>> scanner) {
this.containerScanHandler = scanner;
}

/**
* Triggers a scan of a container in this set using the registered scan handler. This is a no-op if no scan handler
* is registered or the container does not exist in the set.
* @param containerID The container in this set to scan.
*/
public void scanContainer(long containerID) {
if (containerScanHandler != null) {
Container<?> container = getContainer(containerID);
if (container != null) {
containerScanHandler.accept(container);
} else {
LOG.warn("Request to scan container {} which was not found in the container set", containerID);
}
}
}

/**
* Add Container to container map.
* @param container container to be added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.hadoop.ozone.container.common.volume.VolumeUsage;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
Expand Down Expand Up @@ -428,7 +427,7 @@ && getMissingContainerSet().contains(containerID)) {
// Create a specific exception that signals for on demand scanning
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
OnDemandContainerDataScanner.scanContainer(container);
containerSet.scanContainer(containerID);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -1507,6 +1506,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
Set<DatanodeDetails> peers) throws IOException {
KeyValueContainer kvContainer = (KeyValueContainer) container;
KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();
long containerID = containerData.getContainerID();
Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo = checksumManager.read(containerData);
ContainerProtos.ContainerChecksumInfo checksumInfo;

Expand All @@ -1521,10 +1521,10 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
for (DatanodeDetails peer : peers) {
long start = Instant.now().toEpochMilli();
ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo(
containerData.getContainerID(), peer);
containerID, peer);
if (peerChecksumInfo == null) {
LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum",
containerData.getContainerID(), peer);
containerID, peer);
continue;
}

Expand All @@ -1538,7 +1538,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
handleMissingBlock(kvContainer, pipeline, dnClient, missingBlock, chunkByteBuffer);
} catch (IOException e) {
LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(),
containerData.getContainerID(), e);
containerID, e);
}
}

Expand All @@ -1548,7 +1548,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer);
} catch (IOException e) {
LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(),
containerData.getContainerID(), e);
containerID, e);
}
}

Expand All @@ -1558,7 +1558,7 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(), entry.getValue(), chunkByteBuffer);
} catch (IOException e) {
LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(),
containerData.getContainerID(), e);
containerID, e);
}
}
// Update checksum based on RocksDB metadata. The read chunk validates the checksum of the data
Expand All @@ -1570,18 +1570,18 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
if (dataChecksum == oldDataChecksum) {
metrics.incContainerReconciledWithoutChanges();
LOG.info("Container {} reconciled with peer {}. No change in checksum. Current checksum {}. Time taken {} ms",
containerData.getContainerID(), peer.toString(), checksumToString(dataChecksum), duration);
containerID, peer.toString(), checksumToString(dataChecksum), duration);
} else {
metrics.incContainerReconciledWithChanges();
LOG.warn("Container {} reconciled with peer {}. Checksum updated from {} to {}. Time taken {} ms",
containerData.getContainerID(), peer.toString(), checksumToString(oldDataChecksum),
containerID, peer.toString(), checksumToString(oldDataChecksum),
checksumToString(dataChecksum), duration);
}
ContainerLogger.logReconciled(container.getContainerData(), oldDataChecksum, peer);
}

// Trigger manual on demand scanner
OnDemandContainerDataScanner.scanContainer(container);
containerSet.scanContainer(containerID);
sendICR(container);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.hadoop.ozone.container.ozoneimpl.AbstractBackgroundContainerScanner.logUnhealthyScanResult;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
Expand All @@ -45,8 +44,6 @@ public final class OnDemandContainerDataScanner {
public static final Logger LOG =
LoggerFactory.getLogger(OnDemandContainerDataScanner.class);

private static volatile OnDemandContainerDataScanner instance;

private final ExecutorService scanExecutor;
private final ContainerController containerController;
private final DataTransferThrottler throttler;
Expand All @@ -56,7 +53,7 @@ public final class OnDemandContainerDataScanner {
private final OnDemandScannerMetrics metrics;
private final long minScanGap;

private OnDemandContainerDataScanner(
public OnDemandContainerDataScanner(
ContainerScannerConfiguration conf, ContainerController controller) {
containerController = controller;
throttler = new DataTransferThrottler(
Expand All @@ -68,26 +65,11 @@ private OnDemandContainerDataScanner(
minScanGap = conf.getContainerScanMinGap();
}

public static synchronized void init(
ContainerScannerConfiguration conf, ContainerController controller) {
if (instance != null) {
LOG.warn("Trying to initialize on demand scanner" +
" a second time on a datanode.");
return;
}
instance = new OnDemandContainerDataScanner(conf, controller);
}

private static boolean shouldScan(Container<?> container) {
private boolean shouldScan(Container<?> container) {
if (container == null) {
return false;
}
long containerID = container.getContainerData().getContainerID();
if (instance == null) {
LOG.debug("Skipping on demand scan for container {} since scanner was " +
"not initialized.", containerID);
return false;
}

HddsVolume containerVolume = container.getContainerData().getVolume();
if (containerVolume.isFailed()) {
Expand All @@ -96,36 +78,36 @@ private static boolean shouldScan(Container<?> container) {
return false;
}

return !ContainerUtils.recentlyScanned(container, instance.minScanGap,
return !ContainerUtils.recentlyScanned(container, minScanGap,
LOG) && container.shouldScanData();
}

public static Optional<Future<?>> scanContainer(Container<?> container) {
public Optional<Future<?>> scanContainer(Container<?> container) {
if (!shouldScan(container)) {
return Optional.empty();
}

Future<?> resultFuture = null;
long containerId = container.getContainerData().getContainerID();
if (addContainerToScheduledContainers(containerId)) {
resultFuture = instance.scanExecutor.submit(() -> {
resultFuture = scanExecutor.submit(() -> {
performOnDemandScan(container);
removeContainerFromScheduledContainers(containerId);
});
}
return Optional.ofNullable(resultFuture);
}

private static boolean addContainerToScheduledContainers(long containerId) {
return instance.containerRescheduleCheckSet.add(containerId);
private boolean addContainerToScheduledContainers(long containerId) {
return containerRescheduleCheckSet.add(containerId);
}

private static void removeContainerFromScheduledContainers(
private void removeContainerFromScheduledContainers(
long containerId) {
instance.containerRescheduleCheckSet.remove(containerId);
containerRescheduleCheckSet.remove(containerId);
}

private static void performOnDemandScan(Container<?> container) {
private void performOnDemandScan(Container<?> container) {
if (!shouldScan(container)) {
return;
}
Expand All @@ -135,29 +117,29 @@ private static void performOnDemandScan(Container<?> container) {
ContainerData containerData = container.getContainerData();
logScanStart(containerData);

ScanResult result = container.scanData(instance.throttler, instance.canceler);
ScanResult result = container.scanData(throttler, canceler);
// Metrics for skipped containers should not be updated.
if (result.isDeleted()) {
LOG.debug("Container [{}] has been deleted during the data scan.", containerId);
} else {
if (!result.isHealthy()) {
logUnhealthyScanResult(containerId, result, LOG);
boolean containerMarkedUnhealthy = instance.containerController
boolean containerMarkedUnhealthy = containerController
.markContainerUnhealthy(containerId, result);
if (containerMarkedUnhealthy) {
instance.metrics.incNumUnHealthyContainers();
metrics.incNumUnHealthyContainers();
}
}
// TODO HDDS-10374 will need to update the merkle tree here as well.
instance.metrics.incNumContainersScanned();
metrics.incNumContainersScanned();
}

// Even if the container was deleted, mark the scan as completed since we already logged it as starting.
Instant now = Instant.now();
logScanCompleted(containerData, now);

if (!result.isDeleted()) {
instance.containerController.updateDataScanTimestamp(containerId, now);
containerController.updateDataScanTimestamp(containerId, now);
}
} catch (IOException e) {
LOG.warn("Unexpected exception while scanning container "
Expand All @@ -169,7 +151,7 @@ private static void performOnDemandScan(Container<?> container) {
}
}

private static void logScanStart(ContainerData containerData) {
private void logScanStart(ContainerData containerData) {
if (LOG.isDebugEnabled()) {
Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
Expand All @@ -178,35 +160,17 @@ private static void logScanStart(ContainerData containerData) {
}
}

private static void logScanCompleted(
private void logScanCompleted(
ContainerData containerData, Instant timestamp) {
LOG.debug("Completed scan of container {} at {}",
containerData.getContainerID(), timestamp);
}

public static OnDemandScannerMetrics getMetrics() {
return instance.metrics;
}

@VisibleForTesting
public static DataTransferThrottler getThrottler() {
return instance.throttler;
}

@VisibleForTesting
public static Canceler getCanceler() {
return instance.canceler;
}

public static synchronized void shutdown() {
if (instance == null) {
return;
}
instance.shutdownScanner();
public OnDemandScannerMetrics getMetrics() {
return metrics;
}

private synchronized void shutdownScanner() {
instance = null;
public synchronized void shutdown() {
metrics.unregister();
String shutdownMessage = "On-demand container scanner is shutting down.";
LOG.info(shutdownMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class OzoneContainer {
private final XceiverServerSpi readChannel;
private final ContainerController controller;
private BackgroundContainerMetadataScanner metadataScanner;
private OnDemandContainerDataScanner onDemandScanner;
private List<BackgroundContainerDataScanner> dataScanners;
private List<AbstractBackgroundContainerScanner> backgroundScanners;
private final BlockDeletingService blockDeletingService;
Expand Down Expand Up @@ -432,7 +433,8 @@ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) {
"so the on-demand container data scanner will not start.");
return;
}
OnDemandContainerDataScanner.init(c, controller);
onDemandScanner = new OnDemandContainerDataScanner(c, controller);
containerSet.registerContainerScanHandler(onDemandScanner::scanContainer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass the OnDemandContainerDataScanner itself here instead of using Consumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I would rather keep the coupling between the two classes looser. We could make a scanner interface but it would only have one method and only be used here. The consumer was a light-weight way to get this same functionality.

}

/**
Expand All @@ -451,7 +453,7 @@ private void stopContainerScrub() {
for (BackgroundContainerDataScanner s : dataScanners) {
s.shutdown();
}
OnDemandContainerDataScanner.shutdown();
onDemandScanner.shutdown();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -284,6 +285,29 @@ public void testListContainerFromFirstKey(ContainerLayoutVersion layout)
assertContainerIds(FIRST_ID, count, result);
}

@ContainerLayoutTestInfo.ContainerTest
public void testContainerScanHandler(ContainerLayoutVersion layout) throws Exception {
setLayoutVersion(layout);
ContainerSet containerSet = createContainerSet();
// Scan when no handler is registered should not throw an exception.
containerSet.scanContainer(FIRST_ID);

AtomicLong invocationCount = new AtomicLong();
containerSet.registerContainerScanHandler(c -> {
// If the handler was incorrectly triggered for a non-existent container, this assert would fail.
assertEquals(FIRST_ID, c.getContainerData().getContainerID());
invocationCount.getAndIncrement();
});

// Scan of an existing container when a handler is registered should trigger a scan.
containerSet.scanContainer(FIRST_ID);
assertEquals(1, invocationCount.get());

// Scan of non-existent container should not throw exception or trigger an additional invocation.
containerSet.scanContainer(FIRST_ID - 1);
assertEquals(1, invocationCount.get());
}

/**
* Verify that {@code result} contains {@code count} containers
* with IDs in increasing order starting at {@code startId}.
Expand Down
Loading