Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ public void registerOnDemandScanner(OnDemandContainerScanner scanner) {
* exist in the set.
* @param containerID The container in this set to scan.
*/
public void scanContainer(long containerID) {
public void scanContainer(long containerID, String reasonForScan) {
if (containerScanner != null) {
Container<?> container = getContainer(containerID);
if (container != null) {
containerScanner.scanContainer(container);
containerScanner.scanContainer(container, reasonForScan);
} else {
LOG.warn("Request to scan container {} which was not found in the container set", containerID);
}
Expand All @@ -160,11 +160,11 @@ public void scanContainer(long containerID) {
* This is a no-op if no scanner is registered or the container does not exist in the set.
* @param containerID The container in this set to scan.
*/
public void scanContainerWithoutGap(long containerID) {
public void scanContainerWithoutGap(long containerID, String reasonForScan) {
if (containerScanner != null) {
Container<?> container = getContainer(containerID);
if (container != null) {
containerScanner.scanContainerWithoutGap(container);
containerScanner.scanContainerWithoutGap(container, reasonForScan);
} else {
LOG.warn("Request to scan container {} which was not found in the container set", containerID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ && getMissingContainerSet().contains(containerID)) {
// Create a specific exception that signals for on demand scanning
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
containerSet.scanContainer(containerID);
containerSet.scanContainer(containerID, result.name());
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1709,7 +1709,8 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late
}

// Trigger on demand scanner, which will build the merkle tree based on the newly ingested data.
containerSet.scanContainerWithoutGap(containerID);
containerSet.scanContainerWithoutGap(containerID,
"Container reconciliation");
sendICR(container);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ public OnDemandContainerScanner(
* @return An Optional containing a Future representing the pending scan task if the task is queued.
* The optional is empty if the task is not queued due to an ongoing scan.
*/
public Optional<Future<?>> scanContainer(Container<?> container) {
return scanContainer(container, scannerHelper);
public Optional<Future<?>> scanContainer(Container<?> container, String reasonForScan) {
return scanContainer(container, scannerHelper, reasonForScan);
}

/**
* Triggers an on-demand scan of this container regardless of whether it was recently scanned.
* @return An Optional containing a Future representing the pending scan task if the task is queued.
* The optional is empty if the task is not queued due to an ongoing scan.
*/
public Optional<Future<?>> scanContainerWithoutGap(Container<?> container) {
return scanContainer(container, scannerHelperWithoutGap);
public Optional<Future<?>> scanContainerWithoutGap(Container<?> container, String reasonForScan) {
return scanContainer(container, scannerHelperWithoutGap, reasonForScan);
}

private Optional<Future<?>> scanContainer(Container<?> container, ContainerScanHelper helper) {
private Optional<Future<?>> scanContainer(Container<?> container, ContainerScanHelper helper, String reasonForScan) {
if (!helper.shouldScanMetadata(container)) {
return Optional.empty();
}
Expand All @@ -87,9 +87,14 @@ private Optional<Future<?>> scanContainer(Container<?> container, ContainerScanH
long containerId = container.getContainerData().getContainerID();
if (addContainerToScheduledContainers(containerId)) {
resultFuture = scanExecutor.submit(() -> {
performOnDemandScan(container, helper);
performOnDemandScan(container, helper, reasonForScan);
removeContainerFromScheduledContainers(containerId);
});
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping OnDemandScan for Container {} triggered due to '{}'. Reason: Already scheduled.",
containerId, reasonForScan);
}
}
return Optional.ofNullable(resultFuture);
}
Expand All @@ -103,7 +108,11 @@ private void removeContainerFromScheduledContainers(
containerRescheduleCheckSet.remove(containerId);
}

private void performOnDemandScan(Container<?> container, ContainerScanHelper helper) {
private void performOnDemandScan(Container<?> container, ContainerScanHelper helper, String reasonForScan) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling OnDemandScan for Container {}, Reason: {}",
container.getContainerData().getContainerID(), reasonForScan);
}
try {
if (helper.shouldScanData(container)) {
helper.scanData(container, throttler, canceler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void importContainer(long containerID, Path tarFilePath,
// After container import is successful, increase used space for the volume and schedule an OnDemand scan for it
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
containerSet.scanContainer(containerID);
containerSet.scanContainer(containerID, "Imported container");
}
} finally {
importContainerProgress.remove(containerID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -62,6 +63,8 @@ public class TestContainerSet {

private ContainerLayoutVersion layoutVersion;

private static final String TEST_SCAN = "Test Scan";

private void setLayoutVersion(ContainerLayoutVersion layoutVersion) {
this.layoutVersion = layoutVersion;
}
Expand Down Expand Up @@ -293,11 +296,11 @@ public void testContainerScanHandler(ContainerLayoutVersion layout) throws Excep
setLayoutVersion(layout);
ContainerSet containerSet = createContainerSet();
// Scan when no handler is registered should not throw an exception.
containerSet.scanContainer(FIRST_ID);
containerSet.scanContainer(FIRST_ID, TEST_SCAN);

AtomicLong invocationCount = new AtomicLong();
OnDemandContainerScanner mockScanner = mock(OnDemandContainerScanner.class);
when(mockScanner.scanContainer(any())).then(inv -> {
when(mockScanner.scanContainer(any(), anyString())).then(inv -> {
KeyValueContainer c = inv.getArgument(0);
// If the handler was incorrectly triggered for a non-existent container, this assert would fail.
assertEquals(FIRST_ID, c.getContainerData().getContainerID());
Expand All @@ -307,11 +310,11 @@ public void testContainerScanHandler(ContainerLayoutVersion layout) throws Excep
containerSet.registerOnDemandScanner(mockScanner);

// Scan of an existing container when a handler is registered should trigger a scan.
containerSet.scanContainer(FIRST_ID);
containerSet.scanContainer(FIRST_ID, TEST_SCAN);
assertEquals(1, invocationCount.get());

// Scan of non-existent container should not throw exception or trigger an additional invocation.
containerSet.scanContainer(FIRST_ID - 1);
containerSet.scanContainer(FIRST_ID - 1, TEST_SCAN);
assertEquals(1, invocationCount.get());
}

Expand All @@ -320,11 +323,11 @@ public void testContainerScanHandlerWithoutGap(ContainerLayoutVersion layout) th
setLayoutVersion(layout);
ContainerSet containerSet = createContainerSet();
// Scan when no handler is registered should not throw an exception.
containerSet.scanContainer(FIRST_ID);
containerSet.scanContainer(FIRST_ID, TEST_SCAN);

AtomicLong invocationCount = new AtomicLong();
OnDemandContainerScanner mockScanner = mock(OnDemandContainerScanner.class);
when(mockScanner.scanContainerWithoutGap(any())).then(inv -> {
when(mockScanner.scanContainerWithoutGap(any(), anyString())).then(inv -> {
KeyValueContainer c = inv.getArgument(0);
// If the handler was incorrectly triggered for a non-existent container, this assert would fail.
assertEquals(FIRST_ID, c.getContainerData().getContainerID());
Expand All @@ -334,11 +337,11 @@ public void testContainerScanHandlerWithoutGap(ContainerLayoutVersion layout) th
containerSet.registerOnDemandScanner(mockScanner);

// Scan of an existing container when a handler is registered should trigger a scan.
containerSet.scanContainerWithoutGap(FIRST_ID);
containerSet.scanContainerWithoutGap(FIRST_ID, TEST_SCAN);
assertEquals(1, invocationCount.get());

// Scan of non-existent container should not throw exception or trigger an additional invocation.
containerSet.scanContainerWithoutGap(FIRST_ID - 1);
containerSet.scanContainerWithoutGap(FIRST_ID - 1, TEST_SCAN);
assertEquals(1, invocationCount.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class TestContainerReconciliationWithMockDatanodes {
private static final int CHUNKS_PER_BLOCK = 4;
private static final int NUM_DATANODES = 3;

private static final String TEST_SCAN = "Test Scan";

/**
* Number of corrupt blocks and chunks.
*
Expand Down Expand Up @@ -424,7 +426,8 @@ public KeyValueContainer getContainer(long containerID) {
* Triggers a synchronous scan of the container. This method will block until the scan completes.
*/
public void scanContainer(long containerID) {
Optional<Future<?>> scanFuture = onDemandScanner.scanContainerWithoutGap(containerSet.getContainer(containerID));
Optional<Future<?>> scanFuture = onDemandScanner.scanContainerWithoutGap(containerSet.getContainer(containerID),
TEST_SCAN);
assertTrue(scanFuture.isPresent());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class TestOnDemandContainerScanner extends
TestContainerScannersAbstract {

private OnDemandContainerScanner onDemandScanner;
private static final String TEST_SCAN = "Test Scan";

@Override
@BeforeEach
Expand All @@ -88,7 +89,7 @@ public void testRecentlyScannedContainerIsSkipped() throws Exception {
public void testBypassScanGap() throws Exception {
setScannedTimestampRecent(healthy);

Optional<Future<?>> scanFutureOptional = onDemandScanner.scanContainerWithoutGap(healthy);
Optional<Future<?>> scanFutureOptional = onDemandScanner.scanContainerWithoutGap(healthy, TEST_SCAN);
assertTrue(scanFutureOptional.isPresent());
Future<?> scanFuture = scanFutureOptional.get();
scanFuture.get();
Expand Down Expand Up @@ -122,15 +123,15 @@ public void tearDown() {

@Test
public void testScanTimestampUpdated() throws Exception {
Optional<Future<?>> scanFuture = onDemandScanner.scanContainer(healthy);
Optional<Future<?>> scanFuture = onDemandScanner.scanContainer(healthy, TEST_SCAN);
assertTrue(scanFuture.isPresent());
scanFuture.get().get();
verify(controller, atLeastOnce())
.updateDataScanTimestamp(
eq(healthy.getContainerData().getContainerID()), any());

// Metrics for deleted container should not be updated.
scanFuture = onDemandScanner.scanContainer(healthy);
scanFuture = onDemandScanner.scanContainer(healthy, TEST_SCAN);
assertTrue(scanFuture.isPresent());
scanFuture.get().get();
verify(controller, never())
Expand All @@ -156,11 +157,11 @@ public void testSameContainerQueuedMultipleTimes() throws Exception {
latch.await();
return getUnhealthyDataScanResult();
});
Optional<Future<?>> onGoingScan = onDemandScanner.scanContainer(corruptData);
Optional<Future<?>> onGoingScan = onDemandScanner.scanContainer(corruptData, TEST_SCAN);
assertTrue(onGoingScan.isPresent());
assertFalse(onGoingScan.get().isDone());
//When scheduling the same container again
Optional<Future<?>> secondScan = onDemandScanner.scanContainer(corruptData);
Optional<Future<?>> secondScan = onDemandScanner.scanContainer(corruptData, TEST_SCAN);
//Then the second scan is not scheduled and the first scan can still finish
assertFalse(secondScan.isPresent());
latch.countDown();
Expand All @@ -178,11 +179,11 @@ public void testSameOpenContainerQueuedMultipleTimes() throws Exception {
latch.await();
return getUnhealthyDataScanResult();
});
Optional<Future<?>> onGoingScan = onDemandScanner.scanContainer(openCorruptMetadata);
Optional<Future<?>> onGoingScan = onDemandScanner.scanContainer(openCorruptMetadata, TEST_SCAN);
assertTrue(onGoingScan.isPresent());
assertFalse(onGoingScan.get().isDone());
//When scheduling the same container again
Optional<Future<?>> secondScan = onDemandScanner.scanContainer(openCorruptMetadata);
Optional<Future<?>> secondScan = onDemandScanner.scanContainer(openCorruptMetadata, TEST_SCAN);
//Then the second scan is not scheduled and the first scan can still finish
assertFalse(secondScan.isPresent());
latch.countDown();
Expand All @@ -195,12 +196,12 @@ public void testSameOpenContainerQueuedMultipleTimes() throws Exception {
@Override
public void testScannerMetrics() throws Exception {
ArrayList<Optional<Future<?>>> resultFutureList = Lists.newArrayList();
resultFutureList.add(onDemandScanner.scanContainer(corruptData));
resultFutureList.add(onDemandScanner.scanContainer(openContainer));
resultFutureList.add(onDemandScanner.scanContainer(openCorruptMetadata));
resultFutureList.add(onDemandScanner.scanContainer(healthy));
resultFutureList.add(onDemandScanner.scanContainer(corruptData, TEST_SCAN));
resultFutureList.add(onDemandScanner.scanContainer(openContainer, TEST_SCAN));
resultFutureList.add(onDemandScanner.scanContainer(openCorruptMetadata, TEST_SCAN));
resultFutureList.add(onDemandScanner.scanContainer(healthy, TEST_SCAN));
// Deleted containers will not count towards the scan count metric.
resultFutureList.add(onDemandScanner.scanContainer(deletedContainer));
resultFutureList.add(onDemandScanner.scanContainer(deletedContainer, TEST_SCAN));
waitOnScannerToFinish(resultFutureList);
OnDemandScannerMetrics metrics = onDemandScanner.getMetrics();
//Containers with shouldScanData = false shouldn't increase
Expand Down Expand Up @@ -269,7 +270,7 @@ public void testShutdownDuringScan() throws Exception {
});

// Start the blocking scan.
onDemandScanner.scanContainer(healthy);
onDemandScanner.scanContainer(healthy, TEST_SCAN);
// Shut down the on demand scanner. This will interrupt the blocked scan
// on the healthy container.
onDemandScanner.shutdown();
Expand Down Expand Up @@ -353,7 +354,7 @@ public void testUnhealthyContainersTriggersVolumeScan() throws Exception {
}

private void scanContainer(Container<?> container) throws Exception {
Optional<Future<?>> scanFuture = onDemandScanner.scanContainer(container);
Optional<Future<?>> scanFuture = onDemandScanner.scanContainer(container, TEST_SCAN);
if (scanFuture.isPresent()) {
scanFuture.get().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public void testImportContainerTriggersOnDemandScanner() throws Exception {
containerImporter.importContainer(containerId, tarFile.toPath(),
targetVolume, NO_COMPRESSION);

verify(containerSet, atLeastOnce()).scanContainer(containerId);
verify(containerSet, atLeastOnce()).scanContainer(containerId, "Imported container");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class TestContainerCommandReconciliation {
private static DNContainerOperationClient dnClient;
private static final String KEY_NAME = "testkey";
private static final Logger LOG = LoggerFactory.getLogger(TestContainerCommandReconciliation.class);
private static final String TEST_SCAN = "Test Scan";

@TempDir
private static File testDir;
Expand Down Expand Up @@ -389,7 +390,7 @@ public void testContainerChecksumWithBlockMissing() throws Exception {
db.getStore().flushDB();
}

datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID);
datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN);
waitForDataChecksumsAtSCM(containerID, 2);
ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
readChecksumFile(container.getContainerData());
Expand Down Expand Up @@ -438,7 +439,7 @@ public void testContainerChecksumChunkCorruption() throws Exception {
TestContainerCorruptions.CORRUPT_BLOCK.applyTo(container, blockID);
}

datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID);
datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN);
waitForDataChecksumsAtSCM(containerID, 2);
ContainerProtos.ContainerChecksumInfo containerChecksumAfterChunkCorruption =
readChecksumFile(container.getContainerData());
Expand Down Expand Up @@ -509,7 +510,7 @@ public void testDataChecksumReportedAtSCM() throws Exception {
db.getStore().flushDB();
}

datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID);
datanodeStateMachine.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN);
waitForDataChecksumsAtSCM(containerID, 2);
ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
readChecksumFile(container.getContainerData());
Expand Down