Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class OzoneConsts {
public static final Path ROOT_PATH = Paths.get(OZONE_ROOT);

public static final String CONTAINER_EXTENSION = ".container";
public static final String CONTAINER_DATA_CHECKSUM_EXTENSION = ".tree";
public static final String CONTAINER_META_PATH = "metadata";
public static final String CONTAINER_TEMPORARY_CHUNK_PREFIX = "tmp";
public static final String CONTAINER_CHUNK_NAME_DELIMITER = ".";
Expand Down Expand Up @@ -141,6 +142,7 @@ public final class OzoneConsts {
public static final String CONTAINER_BYTES_USED = "#BYTESUSED";
public static final String PENDING_DELETE_BLOCK_COUNT =
"#PENDINGDELETEBLOCKCOUNT";
public static final String CONTAINER_DATA_CHECKSUM = "#DATACHECKSUM";

/**
* OM LevelDB prefixes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DATA_CHECKSUM_EXTENSION;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -33,7 +34,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
Expand Down Expand Up @@ -321,14 +321,13 @@ public static long getDataChecksum(ContainerProtos.ContainerChecksumInfo checksu
/**
* Returns the container checksum tree file for the specified container without deserializing it.
*/
@VisibleForTesting
public static File getContainerChecksumFile(ContainerData data) {
return new File(data.getMetadataPath(), data.getContainerID() + ".tree");
return new File(data.getMetadataPath(), data.getContainerID() + CONTAINER_DATA_CHECKSUM_EXTENSION);
}

@VisibleForTesting
public static File getTmpContainerChecksumFile(ContainerData data) {
return new File(data.getMetadataPath(), data.getContainerID() + ".tree.tmp");
return new File(data.getMetadataPath(), data.getContainerID() + CONTAINER_DATA_CHECKSUM_EXTENSION + ".tmp");
}

private Lock getLock(long containerID) {
Expand All @@ -343,8 +342,7 @@ private Lock getLock(long containerID) {
*/
public ContainerProtos.ContainerChecksumInfo read(ContainerData data) throws IOException {
try {
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), () ->
readChecksumInfo(data).orElse(ContainerProtos.ContainerChecksumInfo.newBuilder().build()));
return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(), () -> readChecksumInfo(data));
} catch (IOException ex) {
metrics.incrementMerkleTreeReadFailures();
throw ex;
Expand Down Expand Up @@ -422,17 +420,18 @@ public ByteString getContainerChecksumInfo(KeyValueContainerData data) throws IO
* Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically
* swapped into place.
*/
public static Optional<ContainerProtos.ContainerChecksumInfo> readChecksumInfo(ContainerData data)
public static ContainerProtos.ContainerChecksumInfo readChecksumInfo(ContainerData data)
throws IOException {
long containerID = data.getContainerID();
File checksumFile = getContainerChecksumFile(data);
try {
if (!checksumFile.exists()) {
LOG.debug("No checksum file currently exists for container {} at the path {}", containerID, checksumFile);
return Optional.empty();
return ContainerProtos.ContainerChecksumInfo.newBuilder().build();
}

try (InputStream inStream = Files.newInputStream(checksumFile.toPath())) {
return Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream));
return ContainerProtos.ContainerChecksumInfo.parseFrom(inStream);
}
} catch (IOException ex) {
throw new IOException("Error occurred when reading container merkle tree for containerID "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DATA_CHECKSUM;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
import static org.apache.hadoop.ozone.OzoneConsts.DELETE_TRANSACTION_KEY;
Expand Down Expand Up @@ -424,6 +425,10 @@ public String getPendingDeleteBlockCountKey() {
return formatKey(PENDING_DELETE_BLOCK_COUNT);
}

public String getContainerDataChecksumKey() {
return formatKey(CONTAINER_DATA_CHECKSUM);
}

public String getDeletingBlockKeyPrefix() {
return formatKey(DELETING_KEY_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,8 @@ private void updateContainerChecksumFromMetadataIfNeeded(Container container) {
* This method does not send an ICR with the updated checksum info.
* @param container - Container for which the container merkle tree needs to be updated.
*/
private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksumFromMetadata(
@VisibleForTesting
public ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksumFromMetadata(
KeyValueContainer container) throws IOException {
ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
try (DBHandle dbHandle = BlockUtils.getDB(container.getContainerData(), conf);
Expand All @@ -1427,7 +1428,7 @@ private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksumFromM

private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(Container container,
ContainerMerkleTreeWriter treeWriter, boolean sendICR) throws IOException {
ContainerData containerData = container.getContainerData();
KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();

// Attempt to write the new data checksum to disk. If persisting this fails, keep using the original data
// checksum to prevent divergence from what SCM sees in the ICR vs what datanode peers will see when pulling the
Expand All @@ -1440,6 +1441,17 @@ private ContainerProtos.ContainerChecksumInfo updateAndGetContainerChecksum(Cont

if (updatedDataChecksum != originalDataChecksum) {
containerData.setDataChecksum(updatedDataChecksum);
try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf)) {
// This value is only used during the datanode startup. If the update fails, then it's okay as the merkle tree
// and in-memory checksum will still be the same. This will be updated the next time we update the tree.
// Either scanner or reconciliation will update the checksum.
dbHandle.getStore().getMetadataTable().put(containerData.getContainerDataChecksumKey(), updatedDataChecksum);
} catch (IOException e) {
LOG.error("Failed to update container data checksum in RocksDB for container {}. " +
"Leaving the original checksum in RocksDB: {}", containerData.getContainerID(),
checksumToString(originalDataChecksum), e);
}

if (sendICR) {
sendICR(container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.hadoop.hdds.utils.Archiver.readEntry;
import static org.apache.hadoop.hdds.utils.Archiver.tar;
import static org.apache.hadoop.hdds.utils.Archiver.untar;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DATA_CHECKSUM_EXTENSION;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;

import java.io.File;
Expand All @@ -44,7 +45,9 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
Expand Down Expand Up @@ -92,8 +95,9 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
}

Path dbRoot = getDbPath(containerUntarDir, containerData);
Path chunksRoot = getChunkPath(containerUntarDir, containerData);
byte[] descriptorFileContent = innerUnpack(input, dbRoot, chunksRoot);
Path chunksRoot = getChunkPath(containerUntarDir);
Path tempContainerMetadataPath = getTempContainerMetadataPath(containerUntarDir, containerData);
byte[] descriptorFileContent = innerUnpack(input, dbRoot, chunksRoot, tempContainerMetadataPath);

if (!Files.exists(destContainerDir)) {
Files.createDirectories(destContainerDir);
Expand All @@ -111,9 +115,6 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
// Before the atomic move, the destination dir is empty and doesn't have a metadata directory.
// Writing the .container file will fail as the metadata dir doesn't exist.
// So we instead save the container file to the containerUntarDir.
Path containerMetadataPath = Paths.get(container.getContainerData().getMetadataPath());
Path tempContainerMetadataPath = Paths.get(containerUntarDir.toString(),
containerMetadataPath.getName(containerMetadataPath.getNameCount() - 1).toString());
persistCustomContainerState(container, descriptorFileContent, State.RECOVERING, tempContainerMetadataPath);
Files.move(containerUntarDir, destContainerDir,
StandardCopyOption.ATOMIC_MOVE,
Expand Down Expand Up @@ -146,6 +147,11 @@ public void pack(Container<KeyValueContainerData> container,
includeFile(container.getContainerFile(), CONTAINER_FILE_NAME,
archiveOutput);

File containerChecksumFile = ContainerChecksumTreeManager.getContainerChecksumFile(containerData);
if (containerChecksumFile.exists()) {
includeFile(containerChecksumFile, containerChecksumFile.getName(), archiveOutput);
}

includePath(getDbPath(containerData), DB_DIR_NAME,
archiveOutput);

Expand Down Expand Up @@ -202,11 +208,20 @@ public static Path getDbPath(Path baseDir,
}
}

public static Path getChunkPath(Path baseDir,
KeyValueContainerData containerData) {
public static Path getChunkPath(Path baseDir) {
return KeyValueContainerLocationUtil.getChunksLocationPath(baseDir.toString()).toPath();
}

private Path getContainerMetadataPath(ContainerData containerData) {
return Paths.get(containerData.getMetadataPath());
}

private Path getTempContainerMetadataPath(Path containerUntarDir, ContainerData containerData) {
Path containerMetadataPath = getContainerMetadataPath(containerData);
return Paths.get(containerUntarDir.toString(),
containerMetadataPath.getName(containerMetadataPath.getNameCount() - 1).toString());
}

InputStream decompress(InputStream input) throws IOException {
return compression.wrap(input);
}
Expand All @@ -215,7 +230,7 @@ OutputStream compress(OutputStream output) throws IOException {
return compression.wrap(output);
}

private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot)
private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot, Path metadataRoot)
throws IOException {
byte[] descriptorFileContent = null;
try (ArchiveInputStream<TarArchiveEntry> archiveInput = untar(decompress(input))) {
Expand All @@ -233,6 +248,10 @@ private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot)
.resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
extractEntry(entry, archiveInput, size, chunksRoot,
destinationPath);
} else if (name.endsWith(CONTAINER_DATA_CHECKSUM_EXTENSION)) {
Path destinationPath = metadataRoot.resolve(name);
extractEntry(entry, archiveInput, size, metadataRoot,
destinationPath);
} else if (CONTAINER_FILE_NAME.equals(name)) {
//Don't do anything. Container file should be unpacked in a
//separated step by unpackContainerDescriptor call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -260,10 +259,10 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData,
DatanodeStore store = null;
try {
try {
boolean readOnly = ContainerInspectorUtil.isReadOnly(
ContainerProtos.ContainerType.KeyValueContainer);
store = BlockUtils.getUncachedDatanodeStore(
kvContainerData, config, readOnly);
// Open RocksDB in write mode, as it is required for container checksum updates and inspector repair operations.
// The method KeyValueContainerMetadataInspector.buildErrorAndRepair will determine if write access to the DB
// is permitted based on the mode.
store = BlockUtils.getUncachedDatanodeStore(kvContainerData, config, false);
} catch (IOException e) {
// If an exception is thrown, then it may indicate the RocksDB is
// already open in the container cache. As this code is only executed at
Expand All @@ -288,17 +287,25 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData,
}
}

private static void populateContainerDataChecksum(KeyValueContainerData kvContainerData) {
private static void loadAndSetContainerDataChecksum(KeyValueContainerData kvContainerData,
Table<String, Long> metadataTable) {
if (kvContainerData.isOpen()) {
return;
}

try {
Optional<ContainerChecksumInfo> optionalContainerChecksumInfo = ContainerChecksumTreeManager
.readChecksumInfo(kvContainerData);
if (optionalContainerChecksumInfo.isPresent()) {
ContainerChecksumInfo containerChecksumInfo = optionalContainerChecksumInfo.get();
kvContainerData.setDataChecksum(containerChecksumInfo.getContainerMerkleTree().getDataChecksum());
Long containerDataChecksum = metadataTable.get(kvContainerData.getContainerDataChecksumKey());
if (containerDataChecksum != null && kvContainerData.needsDataChecksum()) {
kvContainerData.setDataChecksum(containerDataChecksum);
return;
}

ContainerChecksumInfo containerChecksumInfo = ContainerChecksumTreeManager.readChecksumInfo(kvContainerData);
if (containerChecksumInfo != null && containerChecksumInfo.hasContainerMerkleTree()
&& kvContainerData.needsDataChecksum()) {
containerDataChecksum = containerChecksumInfo.getContainerMerkleTree().getDataChecksum();
kvContainerData.setDataChecksum(containerDataChecksum);
metadataTable.put(kvContainerData.getContainerDataChecksumKey(), containerDataChecksum);
}
} catch (IOException ex) {
LOG.warn("Failed to read checksum info for container {}", kvContainerData.getContainerID(), ex);
Expand Down Expand Up @@ -375,14 +382,15 @@ private static void populateContainerMetadata(
kvContainerData.markAsEmpty();
}

loadAndSetContainerDataChecksum(kvContainerData, metadataTable);

// Run advanced container inspection/repair operations if specified on
// startup. If this method is called but not as a part of startup,
// The inspectors will be unloaded and this will be a no-op.
ContainerInspectorUtil.process(kvContainerData, store);

// Load finalizeBlockLocalIds for container in memory.
populateContainerFinalizeBlock(kvContainerData, store);
populateContainerDataChecksum(kvContainerData);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.checksum;

import static org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -36,13 +37,17 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

Expand Down Expand Up @@ -346,4 +351,38 @@ public static void writeContainerDataTreeProto(ContainerData data, ContainerProt
}
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
}

/**
* This function verifies that the in-memory data checksum matches the one stored in the container data and
* the RocksDB.
*
* @param containerData The container data to verify.
* @param conf The Ozone configuration.
* @throws IOException If an error occurs while reading the checksum info or RocksDB.
*/
public static void verifyAllDataChecksumsMatch(KeyValueContainerData containerData, OzoneConfiguration conf)
throws IOException {
assertNotNull(containerData, "Container data should not be null");
ContainerProtos.ContainerChecksumInfo containerChecksumInfo = ContainerChecksumTreeManager
.readChecksumInfo(containerData);
assertNotNull(containerChecksumInfo);
long dataChecksum = containerChecksumInfo.getContainerMerkleTree().getDataChecksum();
Long dbDataChecksum;
try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf)) {
dbDataChecksum = dbHandle.getStore().getMetadataTable().get(containerData.getContainerDataChecksumKey());
}

if (containerData.getDataChecksum() == 0) {
assertEquals(containerData.getDataChecksum(), dataChecksum);
// RocksDB checksum can be null if the file doesn't exist or when the file is created by
// the block deleting service. 0 checksum will be stored when the container is loaded without
// merkle tree.
assertThat(dbDataChecksum).isIn(0L, null);
} else {
// In-Memory, Container Merkle Tree file, RocksDB checksum should be equal
assertEquals(containerData.getDataChecksum(), dataChecksum, "In-memory data checksum should match " +
"the one in the checksum file.");
assertEquals(dbDataChecksum, dataChecksum);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public void init() {
container = mock(KeyValueContainerData.class);
when(container.getContainerID()).thenReturn(CONTAINER_ID);
when(container.getMetadataPath()).thenReturn(testDir.getAbsolutePath());
// File name is hardcoded here to check if the file name has been changed, since this would
// need additional compatibility handling.
checksumFile = new File(testDir, CONTAINER_ID + ".tree");
checksumManager = new ContainerChecksumTreeManager(new OzoneConfiguration());
config = new OzoneConfiguration();
Expand Down
Loading