Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -318,21 +318,11 @@ private Lock getLock(long containerID) {
* swapped into place.
*/
public Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData data) throws IOException {
long containerID = data.getContainerID();
File checksumFile = getContainerChecksumFile(data);
try {
if (!checksumFile.exists()) {
LOG.debug("No checksum file currently exists for container {} at the path {}", containerID, checksumFile);
return Optional.empty();
}
try (FileInputStream inStream = new FileInputStream(checksumFile)) {
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(),
() -> Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream)));
}
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), () -> readChecksumInfo(data));
} catch (IOException ex) {
metrics.incrementMerkleTreeReadFailures();
throw new IOException("Error occurred when reading container merkle tree for containerID "
+ data.getContainerID() + " at path " + checksumFile, ex);
throw new IOException(ex);
}
}

Expand Down Expand Up @@ -383,6 +373,29 @@ public ByteString getContainerChecksumInfo(KeyValueContainerData data) throws IO
}
}

/**
* Reads the container checksum info file (containerID.tree) from the disk.
* Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically
* swapped into place.
*/
public static Optional<ContainerProtos.ContainerChecksumInfo> readChecksumInfo(ContainerData data)
throws IOException {
long containerID = data.getContainerID();
File checksumFile = getContainerChecksumFile(data);
try {
if (!checksumFile.exists()) {
LOG.debug("No checksum file currently exists for container {} at the path {}", containerID, checksumFile);
return Optional.empty();
}
try (FileInputStream inStream = new FileInputStream(checksumFile)) {
return Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream));
}
} catch (IOException ex) {
throw new IOException("Error occurred when reading container merkle tree for containerID "
+ data.getContainerID() + " at path " + checksumFile, ex);
}
}

@VisibleForTesting
public ContainerMerkleTreeMetrics getMetrics() {
return this.metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerChecksumInfo;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
Expand Down Expand Up @@ -277,6 +280,23 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData,
}
}

private static void populateContainerDataChecksum(KeyValueContainerData kvContainerData) {
if (kvContainerData.isOpen()) {
return;
}

try {
Optional<ContainerChecksumInfo> optionalContainerChecksumInfo = ContainerChecksumTreeManager
.readChecksumInfo(kvContainerData);
if (optionalContainerChecksumInfo.isPresent()) {
ContainerChecksumInfo containerChecksumInfo = optionalContainerChecksumInfo.get();
kvContainerData.setDataChecksum(containerChecksumInfo.getContainerMerkleTree().getDataChecksum());
}
} catch (IOException ex) {
LOG.warn("Failed to read checksum info for container {}", kvContainerData.getContainerID(), ex);
}
}

private static void populateContainerMetadata(
KeyValueContainerData kvContainerData, DatanodeStore store,
boolean bCheckChunksFilePath)
Expand Down Expand Up @@ -356,6 +376,7 @@ private static void populateContainerMetadata(

// Load finalizeBlockLocalIds for container in memory.
populateContainerFinalizeBlock(kvContainerData, store);
populateContainerDataChecksum(kvContainerData);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public void testDeletedBlocksPreservedOnTreeWrite() throws Exception {
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0);
List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
checksumManager.markBlocksAsDeleted(container, new ArrayList<>(expectedBlocksToDelete));
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0);
ContainerMerkleTreeWriter tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 0);
Expand All @@ -222,7 +221,6 @@ public void testTreePreservedOnDeletedBlocksWrite() throws Exception {
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0);
ContainerMerkleTreeWriter tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0);
List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
checksumManager.markBlocksAsDeleted(container, new ArrayList<>(expectedBlocksToDelete));
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 0);
Expand All @@ -242,8 +240,6 @@ public void testReadContainerMerkleTreeMetric() throws Exception {
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0);
ContainerMerkleTreeWriter tree = buildTestTree(config);
checksumManager.writeContainerDataTree(container, tree);
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 0);
checksumManager.writeContainerDataTree(container, tree);
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 0);
assertTrue(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total() > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public String toString() {
",replicaIndex=" + replicaIndex :
"") +
", isEmpty=" + isEmpty +
", dataChecksum=" + dataChecksum +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ public void testReplicasAreReportedForClosedContainerAfterRestart()
// Ensure 3 replicas are reported successfully as expected.
GenericTestUtils.waitFor(() ->
getContainerReplicas(newContainer).size() == 3, 200, 30000);
for (ContainerReplica replica : getContainerReplicas(newContainer)) {
assertNotEquals(0, replica.getDataChecksum());
}
}

/**
Expand Down Expand Up @@ -198,6 +201,10 @@ public void testCloseClosedContainer()
assertTrue(containerChecksumFileExists(hddsDatanode, container));
}

for (ContainerReplica replica : getContainerReplicas(container)) {
assertNotEquals(0, replica.getDataChecksum());
}

assertThrows(IOException.class,
() -> cluster.getStorageContainerLocationClient()
.closeContainer(container.getContainerID()),
Expand Down Expand Up @@ -269,6 +276,12 @@ public void testContainerChecksumForClosedContainer() throws Exception {
assertNotEquals(prevExpectedChecksumInfo1.getContainerID(), prevExpectedChecksumInfo2.getContainerID());
assertNotEquals(prevExpectedChecksumInfo1.getContainerMerkleTree().getDataChecksum(),
prevExpectedChecksumInfo2.getContainerMerkleTree().getDataChecksum());
for (ContainerReplica replica : getContainerReplicas(containerInfo1)) {
assertNotEquals(0, replica.getDataChecksum());
}
for (ContainerReplica replica : getContainerReplicas(containerInfo2)) {
assertNotEquals(0, replica.getDataChecksum());
}
}

private boolean checkContainerCloseInDatanode(HddsDatanodeService hddsDatanode,
Expand Down
Loading