Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,13 @@ && getMissingContainerSet().contains(containerID)) {
|| containerState == State.RECOVERING);
// mark and persist the container state to be unhealthy
try {
// TODO HDDS-7096 + HDDS-8781: Use on demand scanning for the open
// container instead.
ContainerScanError error = new ContainerScanError(ContainerScanError.FailureType.WRITE_FAILURE,
new File(container.getContainerData().getContainerPath()),
new StorageContainerException(result));
handler.markContainerUnhealthy(container, DataScanResult.fromErrors(Collections.singletonList(error)));
// For unhealthy containers, trigger an async on-demand scan to build container merkle tree,
// as the metadata-based tree may not be reliable due to potential data corruption.
containerSet.scanContainerWithoutGap(containerID, "Unhealthy container scan");
LOG.info("Marked Container UNHEALTHY, ContainerID: {}", containerID);
} catch (IOException ioe) {
// just log the error here in case marking the container fails,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(Cont
public void markContainerUnhealthy(Container container, ScanResult reason)
throws IOException {
container.writeLock();
long containerID = 0L;
long containerID = container.getContainerData().getContainerID();
try {
if (container.getContainerState() == State.UNHEALTHY) {
LOG.debug("Call to mark already unhealthy container {} as unhealthy",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,11 @@ public void resumeContainerScrub() {
backgroundScanners.forEach(AbstractBackgroundContainerScanner::unpause);
}

@VisibleForTesting
public OnDemandContainerScanner getOnDemandScanner() {
return onDemandScanner;
}

/**
* Starts serving requests to ozone container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -140,7 +139,7 @@ public void testReplicasAreReportedForClosedContainerAfterRestart()
// Checksum file exists after container close
for (HddsDatanodeService hddsDatanode: hddsDatanodes) {
GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, container), 100, 5000);
assertTrue(containerChecksumFileExists(hddsDatanode, container.getContainerID()));
GenericTestUtils.waitFor(() -> containerChecksumFileExists(hddsDatanode, container.getContainerID()), 100, 5000);
}

long originalSeq = container.getSequenceId();
Expand Down Expand Up @@ -197,7 +196,7 @@ public void testCloseClosedContainer()
// Checksum file exists after container close
for (HddsDatanodeService hddsDatanode: hddsDatanodes) {
GenericTestUtils.waitFor(() -> checkContainerCloseInDatanode(hddsDatanode, container), 100, 5000);
assertTrue(containerChecksumFileExists(hddsDatanode, container.getContainerID()));
GenericTestUtils.waitFor(() -> containerChecksumFileExists(hddsDatanode, container.getContainerID()), 100, 5000);
}

for (ContainerReplica replica : getContainerReplicas(container)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,9 @@ private OzoneOutputStream createKey(String keyName) throws Exception {
protected OzoneConfiguration getConf() {
return cluster.getConf();
}

protected HddsDatanodeService getDatanode() {
assertEquals(1, cluster.getHddsDatanodes().size());
return cluster.getHddsDatanodes().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,26 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Instant;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandScannerMetrics;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand Down Expand Up @@ -182,4 +188,53 @@ void testCorruptionDetectedForOpenContainers(TestContainerCorruptions corruption
corruption.assertLogged(openContainerID, 1, logCapturer);
}

/**
* Test that {@link OnDemandContainerScanner} is triggered when the HddsDispatcher
* detects write failures and automatically triggers on-demand scans.
*/
@Test
void testOnDemandScanTriggeredByUnhealthyContainer() throws Exception {
long containerID = writeDataToOpenContainer();
Container<?> container = getDnContainer(containerID);
assertEquals(State.OPEN, container.getContainerState());

Optional<Instant> initialScanTime = container.getContainerData().lastDataScanTime();
HddsDatanodeService dn = getDatanode();
ContainerDispatcher dispatcher = dn.getDatanodeStateMachine().getContainer().getDispatcher();
OnDemandScannerMetrics scannerMetrics = dn.getDatanodeStateMachine().getContainer()
.getOnDemandScanner().getMetrics();
int initialScannedCount = scannerMetrics.getNumContainersScanned();

// Create a PutBlock request with malformed block data to trigger internal error
ContainerProtos.ContainerCommandRequestProto writeFailureRequest =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.PutBlock)
.setContainerID(containerID)
.setDatanodeUuid(dn.getDatanodeDetails().getUuidString())
.setPutBlock(ContainerProtos.PutBlockRequestProto.newBuilder()
.setBlockData(ContainerProtos.BlockData.newBuilder()
.setBlockID(ContainerProtos.DatanodeBlockID.newBuilder()
.setContainerID(containerID)
.setLocalID(999L)
.setBlockCommitSequenceId(1)
.build())
.setSize(1024) // Size mismatch with chunks
.build())
.build())
.build();

ContainerProtos.ContainerCommandResponseProto response = dispatcher.dispatch(writeFailureRequest, null);
assertNotEquals(ContainerProtos.Result.SUCCESS, response.getResult());
assertEquals(State.UNHEALTHY, container.getContainerState());

// The dispatcher should have called containerSet.scanContainerWithoutGap due to the failure
GenericTestUtils.waitFor(() -> {
Optional<Instant> currentScanTime = container.getContainerData().lastDataScanTime();
return currentScanTime.isPresent() && currentScanTime.get().isAfter(initialScanTime.orElse(Instant.EPOCH));
}, 500, 5000);

int finalScannedCount = scannerMetrics.getNumContainersScanned();
assertTrue(finalScannedCount > initialScannedCount);
}

}