Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,13 @@ ContainerCommandResponseProto handleCloseContainer(
return malformedRequest(request);
}
try {
ContainerProtos.ContainerDataProto.State previousState = kvContainer.getContainerState();
markContainerForClose(kvContainer);
closeContainer(kvContainer);
if (previousState == RECOVERING) {
// trigger container scan for recovered containers, i.e., after EC reconstruction
containerSet.scanContainer(kvContainer.getContainerData().getContainerID(), "EC Reconstruction");
}
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
Expand Down Expand Up @@ -1592,10 +1597,22 @@ public void deleteContainer(Container container, boolean force)
deleteInternal(container, force);
}

@SuppressWarnings("checkstyle:MethodLength")
@Override
public void reconcileContainer(DNContainerOperationClient dnClient, Container<?> container,
Collection<DatanodeDetails> peers) throws IOException {
long containerID = container.getContainerData().getContainerID();
try {
reconcileContainerInternal(dnClient, container, peers);
} finally {
// Trigger on demand scanner after reconciliation
containerSet.scanContainerWithoutGap(containerID,
"Container reconciliation");
}
}

@SuppressWarnings("checkstyle:MethodLength")
private void reconcileContainerInternal(DNContainerOperationClient dnClient, Container<?> container,
Collection<DatanodeDetails> peers) throws IOException {
KeyValueContainer kvContainer = (KeyValueContainer) container;
KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();
long containerID = containerData.getContainerID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,14 @@ public Container importContainer(
public void exportContainer(final ContainerType type,
final long containerId, final OutputStream outputStream,
final TarContainerPacker packer) throws IOException {
handlers.get(type).exportContainer(
containerSet.getContainer(containerId), outputStream, packer);
try {
handlers.get(type).exportContainer(
containerSet.getContainer(containerId), outputStream, packer);
} catch (IOException e) {
// If export fails, then trigger a scan for the container
containerSet.scanContainer(containerId, "Export failed");
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public void importContainer(long containerID, Path tarFilePath,
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
containerSet.scanContainer(containerID, "Imported container");
} catch (Exception e) {
// Trigger a volume scan if the import failed.
StorageVolumeUtil.onFailure(containerData.getVolume());
throw e;
}
} finally {
importContainerProgress.remove(containerID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,19 @@ public static KeyValueContainer getContainer(long containerId,
return new KeyValueContainer(kvData, new OzoneConfiguration());
}

public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
return getKeyValueHandler(config, datanodeId, contSet, volSet, metrics, new ContainerChecksumTreeManager(config));
}

/**
* Constructs an instance of KeyValueHandler that can be used for testing.
* This instance can be used for tests that do not need an ICR sender or {@link ContainerChecksumTreeManager}.
*/
public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics) {
return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c -> { },
new ContainerChecksumTreeManager(config));
String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics,
ContainerChecksumTreeManager checksumTreeManager) {
return new KeyValueHandler(config, datanodeId, contSet, volSet, metrics, c -> { }, checksumTreeManager);
}

/**
Expand All @@ -227,6 +232,12 @@ public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
return getKeyValueHandler(config, datanodeId, contSet, volSet, ContainerMetrics.create(config));
}

public static KeyValueHandler getKeyValueHandler(ConfigurationSource config,
String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerChecksumTreeManager checksumTreeManager) {
return getKeyValueHandler(config, datanodeId, contSet, volSet, ContainerMetrics.create(config),
checksumTreeManager);
}

public static HddsDispatcher getHddsDispatcher(OzoneConfiguration conf,
ContainerSet contSet,
VolumeSet volSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -72,6 +75,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
Expand All @@ -90,6 +94,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -219,7 +224,7 @@ public void testContainerReconciliation(int numBlocksToDelete, int numChunksToCo
.map(MockDatanode::getDnDetails)
.filter(other -> !current.getDnDetails().equals(other))
.collect(Collectors.toList());
current.reconcileContainer(dnClient, peers, CONTAINER_ID);
current.reconcileContainerSuccess(dnClient, peers, CONTAINER_ID);
}
// Reconciliation should have triggered a second on-demand scan for each replica. Wait for them to finish before
// checking the results.
Expand Down Expand Up @@ -305,6 +310,30 @@ public void testContainerReconciliationWithPeerFailure(FailureLocation failureLo
mockContainerProtocolCalls();
}

@Test
public void testContainerReconciliationFailureContainerScan()
throws Exception {
// Use synchronous on-demand scans to re-build the merkle trees after corruption.
datanodes.forEach(d -> d.scanContainer(CONTAINER_ID));

// Each datanode should have had one on-demand scan during test setup, and a second one after corruption was
// introduced.
waitForExpectedScanCount(1);

for (MockDatanode current : datanodes) {
doThrow(IOException.class).when(current.getHandler().getChecksumManager()).read(any());
List<DatanodeDetails> peers = datanodes.stream()
.map(MockDatanode::getDnDetails)
.filter(other -> !current.getDnDetails().equals(other))
.collect(Collectors.toList());
// Reconciliation should fail for each datanode, since the checksum info cannot be retrieved.
assertThrows(IOException.class, () -> current.reconcileContainer(dnClient, peers, CONTAINER_ID));
Mockito.reset(current.getHandler().getChecksumManager());
}
// Even failure of Reconciliation should have triggered a second on-demand scan for each replica.
waitForExpectedScanCount(2);
}

/**
* Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on
* every datanode.
Expand Down Expand Up @@ -421,7 +450,8 @@ private static class MockDatanode {

containerSet = newContainerSet();
MutableVolumeSet volumeSet = createVolumeSet();
handler = ContainerTestUtils.getKeyValueHandler(conf, dnDetails.getUuidString(), containerSet, volumeSet);
handler = ContainerTestUtils.getKeyValueHandler(conf, dnDetails.getUuidString(), containerSet, volumeSet,
spy(new ContainerChecksumTreeManager(conf)));
handler.setClusterID(CLUSTER_ID);

ContainerController controller = new ContainerController(containerSet,
Expand All @@ -436,6 +466,10 @@ public DatanodeDetails getDnDetails() {
return dnDetails;
}

public KeyValueHandler getHandler() {
return handler;
}

/**
* @throws IOException for general IO errors accessing the checksum file
* @throws java.io.FileNotFoundException When the checksum file does not exist.
Expand Down Expand Up @@ -542,16 +576,21 @@ public void resetOnDemandScanCount() {
onDemandScanner.getMetrics().resetNumContainersScanned();
}

public void reconcileContainer(DNContainerOperationClient client, Collection<DatanodeDetails> peers,
public void reconcileContainerSuccess(DNContainerOperationClient client, Collection<DatanodeDetails> peers,
long containerID) {
log.info("Beginning reconciliation on this mock datanode");
try {
handler.reconcileContainer(client, containerSet.getContainer(containerID), peers);
reconcileContainer(client, peers, containerID);
} catch (IOException ex) {
fail("Container reconciliation failed", ex);
}
}

public void reconcileContainer(DNContainerOperationClient client, Collection<DatanodeDetails> peers,
long containerID) throws IOException {
log.info("Beginning reconciliation on this mock datanode");
handler.reconcileContainer(client, containerSet.getContainer(containerID), peers);
}

/**
* Create a container with the specified number of blocks. Block data is human-readable so the block files can be
* inspected when debugging the test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -392,6 +393,43 @@ public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion)
"Close container should return Invalid container error");
}

@ContainerLayoutTestInfo.ContainerTest
public void testCloseRecoveringContainerTriggersScan(ContainerLayoutVersion layoutVersion) {
final KeyValueHandler keyValueHandler = new KeyValueHandler(conf,
DATANODE_UUID, mockContainerSet, mock(MutableVolumeSet.class), mock(ContainerMetrics.class),
c -> { }, new ContainerChecksumTreeManager(conf));

conf = new OzoneConfiguration();
KeyValueContainerData kvData = new KeyValueContainerData(DUMMY_CONTAINER_ID,
layoutVersion,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
UUID.randomUUID().toString());
kvData.setMetadataPath(tempDir.toString());
kvData.setDbFile(dbFile.toFile());
KeyValueContainer container = new KeyValueContainer(kvData, conf);
ContainerCommandRequestProto createContainerRequest =
createContainerRequest(DATANODE_UUID, DUMMY_CONTAINER_ID);
keyValueHandler.handleCreateContainer(createContainerRequest, container);

// Make the container state as invalid.
kvData.setState(State.RECOVERING);

// Create Close container request
ContainerCommandRequestProto closeContainerRequest =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.CloseContainer)
.setContainerID(DUMMY_CONTAINER_ID)
.setDatanodeUuid(DATANODE_UUID)
.setCloseContainer(ContainerProtos.CloseContainerRequestProto
.getDefaultInstance())
.build();
dispatcher.dispatch(closeContainerRequest, null);

keyValueHandler.handleCloseContainer(closeContainerRequest, container);

verify(mockContainerSet, atLeastOnce()).scanContainer(DUMMY_CONTAINER_ID, "EC Reconstruction");
}

@Test
public void testCreateContainerWithFailure() throws Exception {
final String testDir = tempDir.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -56,6 +58,7 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
Expand All @@ -70,6 +73,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.MockedStatic;

/**
* Test for {@link ContainerImporter}.
Expand Down Expand Up @@ -201,6 +205,19 @@ public void testImportContainerTriggersOnDemandScanner() throws Exception {
verify(containerSet, atLeastOnce()).scanContainer(containerId, "Imported container");
}

@Test
public void testImportContainerFailureTriggersVolumeScan() throws Exception {
HddsVolume targetVolume = mock(HddsVolume.class);
try (MockedStatic<StorageVolumeUtil> mockedStatic = mockStatic(StorageVolumeUtil.class)) {
when(controllerMock.importContainer(any(ContainerData.class), any(), any())).thenThrow(new IOException());
// import the container
File tarFile = containerTarFile(containerId, containerData);
assertThrows(IOException.class, () -> containerImporter.importContainer(containerId, tarFile.toPath(),
targetVolume, NO_COMPRESSION));
mockedStatic.verify(() -> StorageVolumeUtil.onFailure(any()), times(1));
}
}

@Test
public void testImportContainerResetsLastScanTime() throws Exception {
containerData.setDataScanTimestamp(Time.monotonicNow());
Expand Down