Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1582,114 +1582,122 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);

for (DatanodeDetails peer : peers) {
long numMissingBlocksRepaired = 0;
long numCorruptChunksRepaired = 0;
long numMissingChunksRepaired = 0;
// This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the
// next peer.
ContainerMerkleTreeWriter updatedTreeWriter =
new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree());

LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}",
containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo)));
// Data checksum updated after each peer reconciles.
long start = Instant.now().toEpochMilli();
ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo(
containerID, peer);
if (peerChecksumInfo == null) {
LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum",
containerID, peer);
continue;
}
try {
long numMissingBlocksRepaired = 0;
long numCorruptChunksRepaired = 0;
long numMissingChunksRepaired = 0;

LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}",
containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo)));
// Data checksum updated after each peer reconciles.
long start = Instant.now().toEpochMilli();
ContainerProtos.ContainerChecksumInfo peerChecksumInfo;


// Data checksum updated after each peer reconciles.
peerChecksumInfo = dnClient.getContainerChecksumInfo(containerID, peer);
if (peerChecksumInfo == null) {
LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum",
containerID, peer);
continue;
}

ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo);
Pipeline pipeline = createSingleNodePipeline(peer);
// This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with
// the next peer.
ContainerMerkleTreeWriter updatedTreeWriter =
new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree());
ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo);
Pipeline pipeline = createSingleNodePipeline(peer);

// Handle missing blocks
for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) {
long localID = missingBlock.getBlockID();
BlockID blockID = new BlockID(containerID, localID);
if (getBlockManager().blockExists(container, blockID)) {
LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " +
"present. Our container merkle tree is stale.", localID, containerID);
} else {
// Handle missing blocks
for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) {
try {
long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID,
missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer);
if (chunksInBlockRetrieved != 0) {
allBlocksUpdated.add(localID);
numMissingBlocksRepaired++;
long localID = missingBlock.getBlockID();
BlockID blockID = new BlockID(containerID, localID);
if (getBlockManager().blockExists(container, blockID)) {
LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " +
"present. Our container merkle tree is stale.", localID, containerID);
} else {
long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID,
missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer);
if (chunksInBlockRetrieved != 0) {
allBlocksUpdated.add(localID);
numMissingBlocksRepaired++;
}
}
} catch (IOException e) {
LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(),
containerID, e);
containerID, e);
}
}
}

// Handle missing chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getMissingChunks().entrySet()) {
long localID = entry.getKey();
try {
long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (missingChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numMissingChunksRepaired += missingChunksRepaired;
// Handle missing chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getMissingChunks().entrySet()) {
long localID = entry.getKey();
try {
long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (missingChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numMissingChunksRepaired += missingChunksRepaired;
}
} catch (IOException e) {
LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
} catch (IOException e) {
LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
}

// Handle corrupt chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getCorruptChunks().entrySet()) {
long localID = entry.getKey();
try {
long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (corruptChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numCorruptChunksRepaired += corruptChunksRepaired;
// Handle corrupt chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getCorruptChunks().entrySet()) {
long localID = entry.getKey();
try {
long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (corruptChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numCorruptChunksRepaired += corruptChunksRepaired;
}
} catch (IOException e) {
LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
} catch (IOException e) {
LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
}

// Based on repaired done with this peer, write the updated merkle tree to the container.
// This updated tree will be used when we reconcile with the next peer.
ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo;
latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false);

// Log the results of reconciliation with this peer.
long duration = Instant.now().toEpochMilli() - start;
long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo);
long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo);
if (previousDataChecksum == latestDataChecksum) {
if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) {
// This condition should never happen.
LOG.error("Checksum of container was not updated but blocks were repaired.");
// Based on repaired done with this peer, write the updated merkle tree to the container.
// This updated tree will be used when we reconcile with the next peer.
ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo;
latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false);

// Log the results of reconciliation with this peer.
long duration = Instant.now().toEpochMilli() - start;
long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo);
long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo);
if (previousDataChecksum == latestDataChecksum) {
if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) {
// This condition should never happen.
LOG.error("Checksum of container was not updated but blocks were repaired.");
}
LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), duration);
} else {
LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" +
".\nMissing blocks repaired: {}/{}\n" +
"Missing chunks repaired: {}/{}\n" +
"Corrupt chunks repaired: {}/{}\n" +
"Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum),
numMissingBlocksRepaired, diffReport.getMissingBlocks().size(),
numMissingChunksRepaired, diffReport.getMissingChunks().size(),
numCorruptChunksRepaired, diffReport.getCorruptChunks().size(),
duration);
}
LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), duration);
} else {
LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" +
".\nMissing blocks repaired: {}/{}\n" +
"Missing chunks repaired: {}/{}\n" +
"Corrupt chunks repaired: {}/{}\n" +
"Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum),
numMissingBlocksRepaired, diffReport.getMissingBlocks().size(),
numMissingChunksRepaired, diffReport.getMissingChunks().size(),
numCorruptChunksRepaired, diffReport.getCorruptChunks().size(),
duration);
}

ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer);
successfulPeerCount++;
ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer);
successfulPeerCount++;
} catch (IOException ex) {
LOG.error("Failed to reconcile with peer {} for container #{}. Skipping to next peer.",
peer, containerID, ex);
}
}

// Log a summary after reconciling with all peers.
Expand All @@ -1709,8 +1717,7 @@ containerID, peer, checksumToString(previousDataChecksum), checksumToString(late
}

// Trigger on demand scanner, which will build the merkle tree based on the newly ingested data.
containerSet.scanContainerWithoutGap(containerID,
"Container reconciliation");
containerSet.scanContainerWithoutGap(containerID, "Container reconciliation");
sendICR(container);
}

Expand Down Expand Up @@ -1823,6 +1830,10 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel
chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container);
localOffset2Chunk.put(chunkOffset, chunkInfoProto);
// Update the treeWriter to reflect the current state of blocks and chunks on disk after successful
// chunk writes. If putBlockForClosedContainer fails, the container scanner will later update the
// Merkle tree to resolve any discrepancies.
treeWriter.addChunks(localID, true, chunkInfoProto);
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully ingested chunk at offset {} into block {} of container {} from peer {}",
chunkOffset, localID, containerID, peer);
Expand All @@ -1846,7 +1857,6 @@ private long reconcileChunksPerBlock(KeyValueContainer container, Pipeline pipel
List<ContainerProtos.ChunkInfo> allChunks = new ArrayList<>(localOffset2Chunk.values());
localBlockData.setChunks(allChunks);
putBlockForClosedContainer(container, localBlockData, maxBcsId, allChunksSuccessful);
treeWriter.addChunks(localID, true, allChunks);
// Invalidate the file handle cache, so new read requests get the new file if one was created.
chunkManager.finishWriteChunks(container, localBlockData);
}
Expand Down
Loading