Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.checksum;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static org.apache.hadoop.hdds.HddsUtils.checksumToString;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -81,35 +82,46 @@ public void stop() {
* The data merkle tree within the file is replaced with the {@code tree} parameter, but all other content of the
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
* This method also updates the container's data checksum in the {@code data} parameter, which will be seen by SCM
* on container reports.
*/
public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerData data,
ContainerMerkleTreeWriter tree)
throws IOException {
ContainerMerkleTreeWriter tree) throws IOException {
long containerID = data.getContainerID();
// If there is an error generating the tree and we cannot obtain a final checksum, use 0 to indicate a metadata
// failure.
long dataChecksum = 0;
ContainerProtos.ContainerChecksumInfo checksumInfo = null;
Lock writeLock = getLock(containerID);
writeLock.lock();
try {
ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
try {
// If the file is not present, we will create the data for the first time. This happens under a write lock.
checksumInfoBuilder = readBuilder(data)
.orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
checksumInfoBuilder = readBuilder(data).orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
} catch (IOException ex) {
LOG.error("Failed to read container checksum tree file for container {}. Overwriting it with a new instance.",
LOG.error("Failed to read container checksum tree file for container {}. Creating a new instance.",
containerID, ex);
checksumInfoBuilder = ContainerProtos.ContainerChecksumInfo.newBuilder();
}

ContainerProtos.ContainerChecksumInfo checksumInfo = checksumInfoBuilder
ContainerProtos.ContainerMerkleTree treeProto = captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
tree::toProto);
checksumInfoBuilder
.setContainerID(containerID)
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(), tree::toProto))
.build();
.setContainerMerkleTree(treeProto);
checksumInfo = checksumInfoBuilder.build();
write(data, checksumInfo);
LOG.debug("Data merkle tree for container {} updated", containerID);
return checksumInfo;
// If write succeeds, update the checksum in memory. Otherwise 0 will be used to indicate the metadata failure.
dataChecksum = treeProto.getDataChecksum();
LOG.debug("Merkle tree for container {} updated with container data checksum {}", containerID,
checksumToString(dataChecksum));
} finally {
// Even if persisting the tree fails, we should still update the data checksum in memory to report back to SCM.
data.setDataChecksum(dataChecksum);
writeLock.unlock();
}
return checksumInfo;
}

/**
Expand Down Expand Up @@ -296,6 +308,17 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
// chunks from us when they reconcile.
}

public static long getDataChecksum(ContainerProtos.ContainerChecksumInfo checksumInfo) {
return checksumInfo.getContainerMerkleTree().getDataChecksum();
}

/**
* Returns whether the container checksum tree file for the specified container exists without deserializing it.
*/
public static boolean hasContainerChecksumFile(ContainerData data) {
return getContainerChecksumFile(data).exists();
}

/**
* Returns the container checksum tree file for the specified container without deserializing it.
*/
Expand Down Expand Up @@ -354,8 +377,6 @@ private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo che
throw new IOException("Error occurred when writing container merkle tree for containerID "
+ data.getContainerID(), ex);
}
// Set in-memory data checksum.
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is also being used by markBlockAsDeleted in case of failure in that code flow we also want set the checksum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it from here because markBlocksAsDeleted does not affect the merkle tree and should not change the data checksum.

}

/**
Expand Down Expand Up @@ -401,7 +422,7 @@ public ContainerMerkleTreeMetrics getMetrics() {
return this.metrics;
}

public static boolean checksumFileExist(Container container) {
public static boolean checksumFileExist(Container<?> container) {
File checksumFile = getContainerChecksumFile(container.getContainerData());
return checksumFile.exists();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,66 @@ public class ContainerMerkleTreeWriter {
public static final Supplier<ChecksumByteBuffer> CHECKSUM_BUFFER_SUPPLIER = ChecksumByteBufferFactory::crc32CImpl;

/**
* Constructs an empty Container merkle tree object.
* Constructs a writer for an initially empty container merkle tree.
*/
public ContainerMerkleTreeWriter() {
id2Block = new TreeMap<>();
}

/**
* Constructs a writer for a container merkle tree which initially contains all the information from the specified
* proto.
*/
public ContainerMerkleTreeWriter(ContainerProtos.ContainerMerkleTree fromTree) {
id2Block = new TreeMap<>();
for (ContainerProtos.BlockMerkleTree blockTree: fromTree.getBlockMerkleTreeList()) {
long blockID = blockTree.getBlockID();
addBlock(blockID);
for (ContainerProtos.ChunkMerkleTree chunkTree: blockTree.getChunkMerkleTreeList()) {
addChunks(blockID, chunkTree);
}
}
}

/**
* Adds chunks to a block in the tree. The block entry will be created if it is the first time adding chunks to it.
* If the block entry already exists, the chunks will be added to the existing chunks for that block.
*
* @param blockID The ID of the block that these chunks belong to.
* @param healthy True if there were no errors detected with these chunks. False indicates that all the chunks
* being added had errors.
* @param chunks A list of chunks to add to this block. The chunks will be sorted internally by their offset.
*/
public void addChunks(long blockID, Collection<ContainerProtos.ChunkInfo> chunks) {
id2Block.computeIfAbsent(blockID, BlockMerkleTreeWriter::new).addChunks(chunks);
public void addChunks(long blockID, boolean healthy, Collection<ContainerProtos.ChunkInfo> chunks) {
for (ContainerProtos.ChunkInfo chunk: chunks) {
addChunks(blockID, healthy, chunk);
}
}

public void addChunks(long blockID, boolean healthy, ContainerProtos.ChunkInfo... chunks) {
for (ContainerProtos.ChunkInfo chunk: chunks) {
addChunks(blockID, new ChunkMerkleTreeWriter(chunk, healthy));
}
}

private void addChunks(long blockID, ContainerProtos.ChunkMerkleTree... chunks) {
for (ContainerProtos.ChunkMerkleTree chunkTree: chunks) {
addChunks(blockID, new ChunkMerkleTreeWriter(chunkTree));
}
}

private void addChunks(long blockID, ChunkMerkleTreeWriter chunkWriter) {
id2Block.computeIfAbsent(blockID, BlockMerkleTreeWriter::new).addChunks(chunkWriter);
}

/**
* Adds an empty block to the tree. This method is not a pre-requisite to {@code addChunks}.
* If the block entry already exists, it will not be modified.
*
* @param blockID The ID of the empty block to add to the tree
*/
public void addBlock(long blockID) {
id2Block.computeIfAbsent(blockID, BlockMerkleTreeWriter::new);
}

/**
Expand Down Expand Up @@ -112,9 +157,9 @@ private static class BlockMerkleTreeWriter {
*
* @param chunks A list of chunks to add to this block.
*/
public void addChunks(Collection<ContainerProtos.ChunkInfo> chunks) {
for (ContainerProtos.ChunkInfo chunk: chunks) {
offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTreeWriter(chunk));
public void addChunks(ChunkMerkleTreeWriter... chunks) {
for (ChunkMerkleTreeWriter chunk: chunks) {
offset2Chunk.put(chunk.getOffset(), chunk);
}
}

Expand Down Expand Up @@ -160,17 +205,28 @@ private static class ChunkMerkleTreeWriter {
private final boolean isHealthy;
private final long dataChecksum;

ChunkMerkleTreeWriter(ContainerProtos.ChunkInfo chunk) {
ChunkMerkleTreeWriter(ContainerProtos.ChunkInfo chunk, boolean healthy) {
length = chunk.getLen();
offset = chunk.getOffset();
isHealthy = true;
isHealthy = healthy;
ChecksumByteBuffer checksumImpl = CHECKSUM_BUFFER_SUPPLIER.get();
for (ByteString checksum: chunk.getChecksumData().getChecksumsList()) {
checksumImpl.update(checksum.asReadOnlyByteBuffer());
}
this.dataChecksum = checksumImpl.getValue();
}

ChunkMerkleTreeWriter(ContainerProtos.ChunkMerkleTree chunkTree) {
length = chunkTree.getLength();
offset = chunkTree.getOffset();
isHealthy = chunkTree.getIsHealthy();
dataChecksum = chunkTree.getDataChecksum();
}

public long getOffset() {
return offset;
}

/**
* Computes a single hash for this ChunkInfo object. All chunk level checksum computation happens within this
* method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
import java.util.Collection;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -201,7 +201,7 @@ public abstract void deleteContainer(Container container, boolean force)
* @param peers The other datanodes with a copy of this container whose data should be checked.
*/
public abstract void reconcileContainer(DNContainerOperationClient dnClient, Container<?> container,
Set<DatanodeDetails> peers) throws IOException;
Collection<DatanodeDetails> peers) throws IOException;

/**
* Deletes the given files associated with a block of the container.
Expand Down
Loading
Loading