Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e294940
HDDS-11763. Implement container repair logic within datanodes
aswinshakil Nov 22, 2024
99de480
HDDS-11763. Added test cases and fixed bugs.
aswinshakil Jan 6, 2025
f8606fe
HDDS-11763. Fix tests.
aswinshakil Jan 8, 2025
52bde04
Merge branch 'HDDS-10239-container-reconciliation' of https://github.…
aswinshakil Jan 24, 2025
385e8c1
Address partial review comments.
aswinshakil Jan 25, 2025
04aa8a0
Address review comments.
aswinshakil Jan 31, 2025
4f291a0
Merge branch 'HDDS-10239-container-reconciliation' into HDDS-11763-re…
aswinshakil Feb 10, 2025
112762d
Address review.
aswinshakil Feb 18, 2025
07c40e4
Fix tests.
aswinshakil Feb 19, 2025
0350c29
Merge branch 'HDDS-10239-container-reconciliation' of https://github.…
aswinshakil Feb 19, 2025
aa34c57
Address Review Comments
aswinshakil Feb 21, 2025
1e5685e
Merge branch 'HDDS-10239-container-reconciliation' of https://github.…
aswinshakil Mar 7, 2025
546bd6f
Add unit test suite
aswinshakil Mar 10, 2025
89a9848
Fix findbugs.
aswinshakil Mar 10, 2025
2b272e7
Fix findbugs.
aswinshakil Mar 10, 2025
7570005
Address review comments.
aswinshakil Mar 18, 2025
47b5fef
Use BlockInputStream to read data.
aswinshakil Mar 20, 2025
caffe21
Fix findbugs
aswinshakil Mar 20, 2025
12b9443
Use existing blockData from BlockInputStream
aswinshakil Mar 20, 2025
d03e4d9
Use ByteBuffer instead of byte array.
aswinshakil Mar 24, 2025
369b24d
Address review comments.
aswinshakil Apr 4, 2025
f062bed
Address review comments.
aswinshakil Apr 4, 2025
a5796a1
Update tests and address review comments.
aswinshakil Apr 8, 2025
d578f3c
Address review comments.
aswinshakil Apr 10, 2025
130d57a
Address review comments.
aswinshakil Apr 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class BlockInputStream extends BlockExtendedInputStream {

private final Function<BlockID, BlockLocationInfo> refreshFunction;

private BlockData blockData;

public BlockInputStream(
BlockLocationInfo blockInfo,
Pipeline pipeline,
Expand Down Expand Up @@ -153,7 +155,6 @@ public synchronized void initialize() throws IOException {
return;
}

BlockData blockData = null;
List<ChunkInfo> chunks = null;
IOException catchEx = null;
do {
Expand Down Expand Up @@ -554,8 +555,7 @@ public long getLength() {
return length;
}

@VisibleForTesting
synchronized int getChunkIndex() {
public synchronized int getChunkIndex() {
return chunkIndex;
}

Expand Down Expand Up @@ -618,9 +618,12 @@ private void handleReadError(IOException cause) throws IOException {
refreshBlockInfo(cause);
}

@VisibleForTesting
public synchronized List<ChunkInputStream> getChunkStreams() {
return chunkStreams;
}

public BlockData getStreamBlockData() {
return blockData;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -747,4 +747,8 @@ public synchronized void unbuffer() {
public ByteBuffer[] getCachedBuffers() {
return BufferUtils.getReadOnlyByteBuffers(buffers);
}

public ChunkInfo getChunkInfo() {
return chunkInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,11 @@ public static HddsProtos.UUID toProtobuf(UUID uuid) {
: null;
}

/** @return Hex string representation of {@code value} */
public static String checksumToString(long value) {
return Long.toHexString(value);
}

/**
* Logs a warning to report that the class is not closed properly.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ static HddsProtos.ReplicationFactor getLegacyFactor(
return ((ReplicatedReplicationConfig) replicationConfig)
.getReplicationFactor();
}

throw new UnsupportedOperationException(
"Replication configuration of type "
+ replicationConfig.getReplicationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.hdds.scm.container;

import static org.apache.hadoop.hdds.HddsUtils.checksumToString;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
Expand All @@ -26,7 +28,6 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;


/**
* Class which stores ContainerReplica details on the client.
*/
Expand Down Expand Up @@ -102,7 +103,7 @@ public long getDataChecksum() {
private static class LongToHexJsonSerializer extends JsonSerializer<Long> {
@Override
public void serialize(Long value, JsonGenerator gen, SerializerProvider provider) throws IOException {
gen.writeString(Long.toHexString(value));
gen.writeString(checksumToString(value));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void stop() {
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
*/
public void writeContainerDataTree(ContainerData data, ContainerMerkleTreeWriter tree) throws IOException {
public ContainerProtos.ContainerChecksumInfo writeContainerDataTree(ContainerData data,
ContainerMerkleTreeWriter tree)
throws IOException {
long containerID = data.getContainerID();
Lock writeLock = getLock(containerID);
writeLock.lock();
Expand All @@ -98,11 +100,13 @@ public void writeContainerDataTree(ContainerData data, ContainerMerkleTreeWriter
checksumInfoBuilder = ContainerProtos.ContainerChecksumInfo.newBuilder();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not related to this patch, but we should create a helper function, something like readOrCreate, to handle all the create scenarios for checksumInfoBuilder. And reuse it in markBlocksAsDeleted.


checksumInfoBuilder
ContainerProtos.ContainerChecksumInfo checksumInfo = checksumInfoBuilder
.setContainerID(containerID)
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(), tree::toProto));
write(data, checksumInfoBuilder.build());
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(), tree::toProto))
.build();
write(data, checksumInfo);
LOG.debug("Data merkle tree for container {} updated", containerID);
return checksumInfo;
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -146,33 +150,32 @@ public void markBlocksAsDeleted(KeyValueContainerData data, Collection<Long> del
}
}

public ContainerDiffReport diff(KeyValueContainerData thisContainer,
/**
* Compares the checksum info of the container with the peer's checksum info and returns a report of the differences.
* @param thisChecksumInfo The checksum info of the container on this datanode.
* @param peerChecksumInfo The checksum info of the container on the peer datanode.
*/
public ContainerDiffReport diff(ContainerProtos.ContainerChecksumInfo thisChecksumInfo,
ContainerProtos.ContainerChecksumInfo peerChecksumInfo) throws
StorageContainerException {

ContainerDiffReport report = new ContainerDiffReport();
try {
captureLatencyNs(metrics.getMerkleTreeDiffLatencyNS(), () -> {
Preconditions.assertNotNull(thisContainer, "Container data is null");
Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is null");
Optional<ContainerProtos.ContainerChecksumInfo> thisContainerChecksumInfo = read(thisContainer);
if (!thisContainerChecksumInfo.isPresent()) {
throw new StorageContainerException("The container #" + thisContainer.getContainerID() +
" doesn't have container checksum", ContainerProtos.Result.IO_EXCEPTION);
}

if (thisContainer.getContainerID() != peerChecksumInfo.getContainerID()) {
throw new StorageContainerException("Container Id does not match for container "
+ thisContainer.getContainerID(), ContainerProtos.Result.CONTAINER_ID_MISMATCH);
Preconditions.assertNotNull(thisChecksumInfo, "Datanode's checksum info is null.");
Preconditions.assertNotNull(peerChecksumInfo, "Peer checksum info is null.");
if (thisChecksumInfo.getContainerID() != peerChecksumInfo.getContainerID()) {
throw new StorageContainerException("Container ID does not match. Local container ID "
+ thisChecksumInfo.getContainerID() + " , Peer container ID " + peerChecksumInfo.getContainerID(),
ContainerProtos.Result.CONTAINER_ID_MISMATCH);
}

ContainerProtos.ContainerChecksumInfo thisChecksumInfo = thisContainerChecksumInfo.get();
compareContainerMerkleTree(thisChecksumInfo, peerChecksumInfo, report);
});
} catch (IOException ex) {
metrics.incrementMerkleTreeDiffFailures();
throw new StorageContainerException("Container Diff failed for container #" + thisContainer.getContainerID(), ex,
ContainerProtos.Result.IO_EXCEPTION);
throw new StorageContainerException("Container Diff failed for container #" + thisChecksumInfo.getContainerID(),
ex, ContainerProtos.Result.IO_EXCEPTION);
}

// Update Container Diff metrics based on the diff report.
Expand Down Expand Up @@ -314,7 +317,7 @@ private Lock getLock(long containerID) {
* Callers are not required to hold a lock while calling this since writes are done to a tmp file and atomically
* swapped into place.
*/
private Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData data) throws IOException {
public Optional<ContainerProtos.ContainerChecksumInfo> read(ContainerData data) throws IOException {
long containerID = data.getContainerID();
File checksumFile = getContainerChecksumFile(data);
try {
Expand Down Expand Up @@ -361,6 +364,8 @@ private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo che
throw new IOException("Error occurred when writing container merkle tree for containerID "
+ data.getContainerID(), ex);
}
// Set in-memory data checksum.
data.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}

public TokenHelper getTokenHelper() {
return tokenHelper;
}

/**
* Reads {@link ContainerProtos.ContainerChecksumInfo} for a specified container for the specified datanode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class ContainerMetrics implements Closeable {
@Metric private MutableCounterLong containerForceDelete;
@Metric private MutableCounterLong numReadStateMachine;
@Metric private MutableCounterLong bytesReadStateMachine;
@Metric private MutableCounterLong numContainerReconciledWithoutChanges;
@Metric private MutableCounterLong numContainerReconciledWithChanges;


private final EnumMap<ContainerProtos.Type, MutableCounterLong> numOpsArray;
Expand Down Expand Up @@ -172,4 +174,12 @@ public void incBytesReadStateMachine(long bytes) {
public long getBytesReadStateMachine() {
return bytesReadStateMachine.value();
}

public void incContainerReconciledWithoutChanges() {
numContainerReconciledWithoutChanges.incr();
}

public void incContainerReconciledWithChanges() {
numContainerReconciledWithChanges.incr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.hadoop.ozone.container.common.utils;

import static org.apache.hadoop.hdds.HddsUtils.checksumToString;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -145,6 +148,23 @@ public static void logRecovered(ContainerData containerData) {
LOG.info(getMessage(containerData));
}

/**
* Logged when a container is reconciled.
*
* @param containerData The container that was reconciled on this datanode.
* @param oldDataChecksum The old data checksum.
*/
public static void logReconciled(ContainerData containerData, long oldDataChecksum, DatanodeDetails peer) {
if (containerData.getDataChecksum() == oldDataChecksum) {
LOG.info(getMessage(containerData, "Container reconciled with peer " + peer.toString() +
". No change in checksum."));
} else {
LOG.warn(getMessage(containerData, "Container reconciled with peer " + peer.toString() +
". Checksum updated from " + checksumToString(oldDataChecksum) + " to "
+ checksumToString(containerData.getDataChecksum())));
}
}

private static String getMessage(ContainerData containerData,
String message) {
return String.join(FIELD_SEPARATOR, getMessage(containerData), message);
Expand All @@ -155,6 +175,7 @@ private static String getMessage(ContainerData containerData) {
"ID=" + containerData.getContainerID(),
"Index=" + containerData.getReplicaIndex(),
"BCSID=" + containerData.getBlockCommitSequenceId(),
"State=" + containerData.getState());
"State=" + containerData.getState(),
"DataChecksum=" + checksumToString(containerData.getDataChecksum()));
}
}
Loading