Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public void reconstructECContainerGroup(long containerID,
}
metrics.incReconstructionTotal();
metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size());
// Trigger a container scan after successful reconstruction
context.getParent().getContainer().getContainerSet().scanContainer(containerID);
} catch (Exception e) {
// Any exception let's delete the recovering containers.
metrics.incReconstructionFailsTotal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,150 +1568,153 @@ public void reconcileContainer(DNContainerOperationClient dnClient, Container<?>
KeyValueContainerData containerData = (KeyValueContainerData) container.getContainerData();
long containerID = containerData.getContainerID();

// Obtain the original checksum info before reconciling with any peers.
ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData);
if (!originalChecksumInfo.hasContainerMerkleTree()) {
// Try creating the merkle tree from RocksDB metadata if it is not present.
originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer);
}
// This holds our current most up-to-date checksum info that we are using for the container.
ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo;

int successfulPeerCount = 0;
Set<Long> allBlocksUpdated = new HashSet<>();
ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);

for (DatanodeDetails peer : peers) {
long numMissingBlocksRepaired = 0;
long numCorruptChunksRepaired = 0;
long numMissingChunksRepaired = 0;
// This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the
// next peer.
ContainerMerkleTreeWriter updatedTreeWriter =
new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree());

LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}",
containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo)));
// Data checksum updated after each peer reconciles.
long start = Instant.now().toEpochMilli();
ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo(
containerID, peer);
if (peerChecksumInfo == null) {
LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum",
try {
// Obtain the original checksum info before reconciling with any peers.
ContainerProtos.ContainerChecksumInfo originalChecksumInfo = checksumManager.read(containerData);
if (!originalChecksumInfo.hasContainerMerkleTree()) {
// Try creating the merkle tree from RocksDB metadata if it is not present.
originalChecksumInfo = updateAndGetContainerChecksumFromMetadata(kvContainer);
}
// This holds our current most up-to-date checksum info that we are using for the container.
ContainerProtos.ContainerChecksumInfo latestChecksumInfo = originalChecksumInfo;

int successfulPeerCount = 0;
Set<Long> allBlocksUpdated = new HashSet<>();
ByteBuffer chunkByteBuffer = ByteBuffer.allocate(chunkSize);

for (DatanodeDetails peer : peers) {
long numMissingBlocksRepaired = 0;
long numCorruptChunksRepaired = 0;
long numMissingChunksRepaired = 0;
// This will be updated as we do repairs with this peer, then used to write the updated tree for the diff with the
// next peer.
ContainerMerkleTreeWriter updatedTreeWriter =
new ContainerMerkleTreeWriter(latestChecksumInfo.getContainerMerkleTree());

LOG.info("Beginning reconciliation for container {} with peer {}. Current data checksum is {}",
containerID, peer, checksumToString(ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo)));
// Data checksum updated after each peer reconciles.
long start = Instant.now().toEpochMilli();
ContainerProtos.ContainerChecksumInfo peerChecksumInfo = dnClient.getContainerChecksumInfo(
containerID, peer);
continue;
}
if (peerChecksumInfo == null) {
LOG.warn("Cannot reconcile container {} with peer {} which has not yet generated a checksum",
containerID, peer);
continue;
}

ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo);
Pipeline pipeline = createSingleNodePipeline(peer);
ContainerDiffReport diffReport = checksumManager.diff(latestChecksumInfo, peerChecksumInfo);
Pipeline pipeline = createSingleNodePipeline(peer);

// Handle missing blocks
for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) {
long localID = missingBlock.getBlockID();
BlockID blockID = new BlockID(containerID, localID);
if (getBlockManager().blockExists(container, blockID)) {
LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " +
"present. Our container merkle tree is stale.", localID, containerID);
} else {
try {
long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID,
missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer);
if (chunksInBlockRetrieved != 0) {
allBlocksUpdated.add(localID);
numMissingBlocksRepaired++;
}
} catch (IOException e) {
LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(),
containerID, e);
}
}
}

// Handle missing blocks
for (ContainerProtos.BlockMerkleTree missingBlock : diffReport.getMissingBlocks()) {
long localID = missingBlock.getBlockID();
BlockID blockID = new BlockID(containerID, localID);
if (getBlockManager().blockExists(container, blockID)) {
LOG.warn("Cannot reconcile block {} in container {} which was previously reported missing but is now " +
"present. Our container merkle tree is stale.", localID, containerID);
} else {
// Handle missing chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getMissingChunks().entrySet()) {
long localID = entry.getKey();
try {
long chunksInBlockRetrieved = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, localID,
missingBlock.getChunkMerkleTreeList(), updatedTreeWriter, chunkByteBuffer);
if (chunksInBlockRetrieved != 0) {
long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (missingChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numMissingBlocksRepaired++;
numMissingChunksRepaired += missingChunksRepaired;
}
} catch (IOException e) {
LOG.error("Error while reconciling missing block for block {} in container {}", missingBlock.getBlockID(),
LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
}
}

// Handle missing chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getMissingChunks().entrySet()) {
long localID = entry.getKey();
try {
long missingChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (missingChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numMissingChunksRepaired += missingChunksRepaired;
// Handle corrupt chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getCorruptChunks().entrySet()) {
long localID = entry.getKey();
try {
long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (corruptChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numCorruptChunksRepaired += corruptChunksRepaired;
}
} catch (IOException e) {
LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
} catch (IOException e) {
LOG.error("Error while reconciling missing chunk for block {} in container {}", entry.getKey(),
containerID, e);
}
}

// Handle corrupt chunks
for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : diffReport.getCorruptChunks().entrySet()) {
long localID = entry.getKey();
try {
long corruptChunksRepaired = reconcileChunksPerBlock(kvContainer, pipeline, dnClient, entry.getKey(),
entry.getValue(), updatedTreeWriter, chunkByteBuffer);
if (corruptChunksRepaired != 0) {
allBlocksUpdated.add(localID);
numCorruptChunksRepaired += corruptChunksRepaired;
// Based on repaired done with this peer, write the updated merkle tree to the container.
// This updated tree will be used when we reconcile with the next peer.
ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo;
latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false);

// Log the results of reconciliation with this peer.
long duration = Instant.now().toEpochMilli() - start;
long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo);
long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo);
if (previousDataChecksum == latestDataChecksum) {
if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) {
// This condition should never happen.
LOG.error("Checksum of container was not updated but blocks were repaired.");
}
} catch (IOException e) {
LOG.error("Error while reconciling corrupt chunk for block {} in container {}", entry.getKey(),
containerID, e);
LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), duration);
} else {
LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" +
".\nMissing blocks repaired: {}/{}\n" +
"Missing chunks repaired: {}/{}\n" +
"Corrupt chunks repaired: {}/{}\n" +
"Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum),
numMissingBlocksRepaired, diffReport.getMissingBlocks().size(),
numMissingChunksRepaired, diffReport.getMissingChunks().size(),
numCorruptChunksRepaired, diffReport.getCorruptChunks().size(),
duration);
}
}

// Based on repaired done with this peer, write the updated merkle tree to the container.
// This updated tree will be used when we reconcile with the next peer.
ContainerProtos.ContainerChecksumInfo previousChecksumInfo = latestChecksumInfo;
latestChecksumInfo = updateAndGetContainerChecksum(container, updatedTreeWriter, false);
ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer);
successfulPeerCount++;
}

// Log the results of reconciliation with this peer.
long duration = Instant.now().toEpochMilli() - start;
long previousDataChecksum = ContainerChecksumTreeManager.getDataChecksum(previousChecksumInfo);
// Log a summary after reconciling with all peers.
long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo);
long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo);
if (previousDataChecksum == latestDataChecksum) {
if (numCorruptChunksRepaired != 0 || numMissingBlocksRepaired != 0 || numMissingChunksRepaired != 0) {
// This condition should never happen.
LOG.error("Checksum of container was not updated but blocks were repaired.");
}
LOG.info("Container {} reconciled with peer {}. Data checksum {} was not updated. Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), duration);
if (originalDataChecksum == latestDataChecksum) {
LOG.info("Completed reconciliation for container {} with {}/{} peers. " +
"Original data checksum {} was not updated",
containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum));
} else {
LOG.warn("Container {} reconciled with peer {}. Data checksum updated from {} to {}" +
".\nMissing blocks repaired: {}/{}\n" +
"Missing chunks repaired: {}/{}\n" +
"Corrupt chunks repaired: {}/{}\n" +
"Time taken: {} ms",
containerID, peer, checksumToString(previousDataChecksum), checksumToString(latestDataChecksum),
numMissingBlocksRepaired, diffReport.getMissingBlocks().size(),
numMissingChunksRepaired, diffReport.getMissingChunks().size(),
numCorruptChunksRepaired, diffReport.getCorruptChunks().size(),
duration);
}

ContainerLogger.logReconciled(container.getContainerData(), previousDataChecksum, peer);
successfulPeerCount++;
}

// Log a summary after reconciling with all peers.
long originalDataChecksum = ContainerChecksumTreeManager.getDataChecksum(originalChecksumInfo);
long latestDataChecksum = ContainerChecksumTreeManager.getDataChecksum(latestChecksumInfo);
if (originalDataChecksum == latestDataChecksum) {
LOG.info("Completed reconciliation for container {} with {}/{} peers. Original data checksum {} was not updated",
containerID, successfulPeerCount, peers.size(), checksumToString(latestDataChecksum));
} else {
LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " +
"updated from {} to {}", containerID, successfulPeerCount, peers.size(), allBlocksUpdated.size(),
checksumToString(originalDataChecksum), checksumToString(latestDataChecksum));
if (LOG.isDebugEnabled()) {
LOG.debug("Blocks updated in container {} after reconciling with {} peers: {}", containerID,
successfulPeerCount, allBlocksUpdated);
LOG.warn("Completed reconciliation for container {} with {}/{} peers. {} blocks were updated. Data checksum " +
"updated from {} to {}", containerID, successfulPeerCount, peers.size(), allBlocksUpdated.size(),
checksumToString(originalDataChecksum), checksumToString(latestDataChecksum));
if (LOG.isDebugEnabled()) {
LOG.debug("Blocks updated in container {} after reconciling with {} peers: {}", containerID,
successfulPeerCount, allBlocksUpdated);
}
}
} finally {
// Trigger on demand scanner, which will build the merkle tree based on the newly ingested data.
containerSet.scanContainerWithoutGap(containerID,
"Container reconciliation");
sendICR(container);
}

// Trigger on demand scanner, which will build the merkle tree based on the newly ingested data.
containerSet.scanContainerWithoutGap(containerID,
"Container reconciliation");
sendICR(container);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,14 @@ public Container importContainer(
public void exportContainer(final ContainerType type,
final long containerId, final OutputStream outputStream,
final TarContainerPacker packer) throws IOException {
handlers.get(type).exportContainer(
containerSet.getContainer(containerId), outputStream, packer);
try {
handlers.get(type).exportContainer(
containerSet.getContainer(containerId), outputStream, packer);
} catch (IOException e) {
// If export fails, then trigger a scan for the container
containerSet.scanContainer(containerId);
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public void importContainer(long containerID, Path tarFilePath,
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
containerSet.scanContainer(containerID, "Imported container");
} catch (Exception e) {
// Trigger a volume scan if the import failed.
StorageVolumeUtil.onFailure(containerData.getVolume());
throw e;
}
} finally {
importContainerProgress.remove(containerID);
Expand Down
Loading
Loading