Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,18 @@ private static void validate(ContainerCommandResponseProto response)
throw new IllegalArgumentException("Not GetBlock: response=" + response);
}
final GetBlockResponseProto b = response.getGetBlock();
final long blockLength = b.getBlockData().getSize();
final List<ChunkInfo> chunks = b.getBlockData().getChunksList();
for (int i = 0; i < chunks.size(); i++) {
final ChunkInfo c = chunks.get(i);
if (c.getLen() <= 0) {
// HDDS-10682 caused an empty chunk to get written to the end of some EC blocks. Due to this
// validation, these blocks will not be readable. In the EC case, the empty chunk is always
// the last chunk and the offset is the block length. We can safely ignore this case and not fail.
if (c.getLen() <= 0 && i == chunks.size() - 1 && c.getOffset() == blockLength) {
DatanodeBlockID blockID = b.getBlockData().getBlockID();
LOG.warn("The last chunk is empty for container/block {}/{} with an offset of the block length. " +
"Likely due to HDDS-10682. This is safe to ignore.", blockID.getContainerID(), blockID.getLocalID());
} else if (c.getLen() <= 0) {
throw new IOException("Failed to get chunkInfo["
+ i + "]: len == " + c.getLen());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -247,24 +248,15 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
int dataLocs = ECBlockInputStreamProxy
.expectedDataLocations(repConfig, safeBlockGroupLength);
List<Integer> toReconstructIndexes = new ArrayList<>();
List<Integer> notReconstructIndexes = new ArrayList<>();
for (Integer index : missingContainerIndexes) {
if (index <= dataLocs || index > repConfig.getData()) {
toReconstructIndexes.add(index);
} else {
// Don't need to be reconstructed, but we do need a stream to write
// the block data to.
notReconstructIndexes.add(index);
}
// else padded indexes.
}

// Looks like we don't need to reconstruct any missing blocks in this block
// group. The reason for this should be block group had only padding blocks
// in the missing locations.
if (toReconstructIndexes.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping the reconstruction for the block: "
+ blockLocationInfo.getBlockID() + ". In the missing locations: "
+ missingContainerIndexes
+ ", this block group has only padded blocks.");
}
return;
}

try (ECBlockReconstructedStripeInputStream sis
Expand All @@ -276,71 +268,82 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
ECBlockOutputStream[] emptyBlockStreams =
new ECBlockOutputStream[notReconstructIndexes.size()];
ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
try {
// Create streams and buffers for all indexes that need reconstructed
for (int i = 0; i < toReconstructIndexes.size(); i++) {
int replicaIndex = toReconstructIndexes.get(i);
DatanodeDetails datanodeDetails =
targetMap.get(replicaIndex);
targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo,
datanodeDetails, repConfig, replicaIndex
);
DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
// Make sure it's clean. Don't want to reuse the erroneously returned
// buffers from the pool.
bufs[i].clear();
}
// Then create a stream for all indexes that don't need reconstructed, but still need a stream to
// write the empty block data to.
for (int i = 0; i < notReconstructIndexes.size(); i++) {
int replicaIndex = notReconstructIndexes.get(i);
DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
emptyBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
}

sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
try {
readLen = sis.recoverChunks(bufs);
Set<Integer> failedIndexes = sis.getFailedIndexes();
if (!failedIndexes.isEmpty()) {
// There was a problem reading some of the block indexes, but we
// did not get an exception as there must have been spare indexes
// to try and recover from. Therefore we should log out the block
// group details in the same way as for the exception case below.
if (toReconstructIndexes.size() > 0) {
sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
try {
readLen = sis.recoverChunks(bufs);
Set<Integer> failedIndexes = sis.getFailedIndexes();
if (!failedIndexes.isEmpty()) {
// There was a problem reading some of the block indexes, but we
// did not get an exception as there must have been spare indexes
// to try and recover from. Therefore we should log out the block
// group details in the same way as for the exception case below.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
}
} catch (IOException e) {
// When we see exceptions here, it could be due to some transient
// issue that causes the block read to fail when reconstructing it,
// but we have seen issues where the containers don't have the
// blocks they appear they should have, or the block chunks are the
// wrong length etc. In order to debug these sort of cases, if we
// get an error, we will log out the details about the block group
// length on each source, along with their chunk list and chunk
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
throw e;
}
} catch (IOException e) {
// When we see exceptions here, it could be due to some transient
// issue that causes the block read to fail when reconstructing it,
// but we have seen issues where the containers don't have the
// blocks they appear they should have, or the block chunks are the
// wrong length etc. In order to debug these sort of cases, if we
// get an error, we will log out the details about the block group
// length on each source, along with their chunk list and chunk
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
throw e;
}
// TODO: can be submitted in parallel
for (int i = 0; i < bufs.length; i++) {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
future = targetBlockStreams[i].write(bufs[i]);
checkFailures(targetBlockStreams[i], future);
bufs[i].clear();
// TODO: can be submitted in parallel
for (int i = 0; i < bufs.length; i++) {
if (bufs[i].remaining() != 0) {
// If the buffer is empty, we don't need to write it as it will cause
// an empty chunk to be added to the end of the block.
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
future = targetBlockStreams[i].write(bufs[i]);
checkFailures(targetBlockStreams[i], future);
}
bufs[i].clear();
}
length -= readLen;
}
length -= readLen;
}

for (ECBlockOutputStream targetStream : targetBlockStreams) {
targetStream.executePutBlock(true, true,
blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream,
targetStream.getCurrentPutBlkResponseFuture());
List<ECBlockOutputStream> allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams));
allStreams.addAll(Arrays.asList(emptyBlockStreams));
for (ECBlockOutputStream targetStream : allStreams) {
targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
}
} finally {
for (ByteBuffer buf : bufs) {
byteBufferPool.putBuffer(buf);
}
IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
IOUtils.cleanupWithLogger(LOG, emptyBlockStreams);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public class TestContainerCommandsEC {
private static final int EC_CHUNK_SIZE = 1024 * 1024;
private static final int STRIPE_DATA_SIZE = EC_DATA * EC_CHUNK_SIZE;
private static final int NUM_DN = EC_DATA + EC_PARITY + 3;
private static byte[][] inputChunks = new byte[EC_DATA][EC_CHUNK_SIZE];
// Data slots are EC_DATA + 1 so we can generate enough data to have a full stripe
// plus one extra chunk.
private static byte[][] inputChunks = new byte[EC_DATA + 1][EC_CHUNK_SIZE];

// Each key size will be in range [min, max), min inclusive, max exclusive
private static final int[][] KEY_SIZE_RANGES =
Expand Down Expand Up @@ -614,12 +616,19 @@ void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
testECReconstructionCoordinator(missingIndexes, 3);
}

@Test
void testECReconstructionWithPartialStripe()
throws Exception {
testECReconstructionCoordinator(ImmutableList.of(4, 5), 1);
@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 1);
}

@ParameterizedTest
@MethodSource("recoverableMissingIndexes")
void testECReconstructionCoordinatorWithFullAndPartialStripe(List<Integer> missingIndexes)
throws Exception {
testECReconstructionCoordinator(missingIndexes, 4);
}

static Stream<List<Integer>> recoverableMissingIndexes() {
return Stream
Expand Down Expand Up @@ -895,18 +904,19 @@ private void checkBlockData(
reconstructedBlockData) {

for (int i = 0; i < blockData.length; i++) {
Assert.assertEquals(blockData[i].getBlockID(), reconstructedBlockData[i].getBlockID());
Assert.assertEquals(blockData[i].getSize(), reconstructedBlockData[i].getSize());
Assert.assertEquals(blockData[i].getMetadata(), reconstructedBlockData[i].getMetadata());
List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
blockData[i].getChunks();
List<ContainerProtos.ChunkInfo> newBlockDataChunks =
reconstructedBlockData[i].getChunks();
for (int j = 0; j < oldBlockDataChunks.size(); j++) {
ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
if (chunkInfo.getLen() == 0) {
// let's ignore the empty chunks
continue;
}
Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j));
}
// Ensure there are no extra chunks in the reconstructed block
Assert.assertEquals(oldBlockDataChunks.size(), newBlockDataChunks.size());
}
}

Expand Down