Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public class BadDataLocationException extends IOException {

private List<DatanodeDetails> failedLocations = new ArrayList<>();
private final List<DatanodeDetails> failedLocations = new ArrayList<>();
private int failedLocationIndex;

public BadDataLocationException(DatanodeDetails dn) {
Expand All @@ -53,6 +53,13 @@ public BadDataLocationException(DatanodeDetails dn, Throwable ex) {
failedLocations.add(dn);
}

public BadDataLocationException(int failedIndex,
Throwable ex, List<DatanodeDetails> failedLocations) {
super(ex);
failedLocationIndex = failedIndex;
this.failedLocations.addAll(failedLocations);
}

public BadDataLocationException(DatanodeDetails dn, int failedIndex,
Throwable ex) {
super(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public synchronized int read(ByteBuffer byteBuffer) throws IOException {
}
}

private boolean shouldRetryFailedRead(int failedIndex) {
protected boolean shouldRetryFailedRead(int failedIndex) {
Deque<DatanodeDetails> spareLocations = spareDataLocations.get(failedIndex);
if (spareLocations != null && spareLocations.size() > 0) {
failedLocations.add(dataLocations[failedIndex]);
Expand Down Expand Up @@ -470,7 +470,7 @@ protected synchronized void closeStreams() {
seeked = true;
}

private void closeStream(int i) {
protected void closeStream(int i) {
if (blockStreams[i] != null) {
try {
blockStreams[i].close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -610,6 +611,31 @@ protected void loadDataBuffersFromStream()
}

private void readIntoBuffer(int ind, ByteBuffer buf) throws IOException {
List<DatanodeDetails> failedLocations = new LinkedList<>();
while (true) {
int currentBufferPosition = buf.position();
try {
readFromCurrentLocation(ind, buf);
break;
} catch (IOException e) {
DatanodeDetails failedLocation = getDataLocations()[ind];
failedLocations.add(failedLocation);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: read [{}] failed from {} due to {}", this,
ind, failedLocation, e.getMessage());
}
closeStream(ind);
if (shouldRetryFailedRead(ind)) {
buf.position(currentBufferPosition);
} else {
throw new BadDataLocationException(ind, e, failedLocations);
}
}
}
}

private void readFromCurrentLocation(int ind, ByteBuffer buf)
throws IOException {
BlockExtendedInputStream stream = getOrOpenStream(ind);
seekStreamIfNecessary(stream, 0);
while (buf.hasRemaining()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public static class TestBlockInputStreamFactory implements
new LinkedHashMap<>();
private List<ByteBuffer> blockStreamData;
// List of EC indexes that should fail immediately on read
private List<Integer> failIndexes = new ArrayList<>();
private final List<Integer> failIndexes = new ArrayList<>();

private Pipeline currentPipeline;

Expand All @@ -249,8 +249,9 @@ public synchronized void setCurrentPipeline(Pipeline pipeline) {
this.currentPipeline = pipeline;
}

public synchronized void setFailIndexes(List<Integer> fail) {
failIndexes.addAll(fail);
// fail each index in the list once
public synchronized void setFailIndexes(Integer... fail) {
failIndexes.addAll(Arrays.asList(fail));
}

public synchronized BlockExtendedInputStream create(
Expand All @@ -264,7 +265,7 @@ public synchronized BlockExtendedInputStream create(
TestBlockInputStream stream = new TestBlockInputStream(
blockInfo.getBlockID(), blockInfo.getLength(),
blockStreamData.get(repInd - 1), repInd);
if (failIndexes.contains(repInd)) {
if (failIndexes.remove(Integer.valueOf(repInd))) {
stream.setShouldError(true);
}
blockStreams.put(repInd, stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
Expand Down Expand Up @@ -489,6 +490,71 @@ public void testErrorThrownIfBlockNotLongEnough() throws IOException {
}
}

@Test
void testNoErrorIfSpareLocationToRead() throws IOException {
int chunkSize = repConfig.getEcChunkSize();
int blockLength = chunkSize * 3 - 1;

ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);

// We have a length that is less than a stripe, so chunks 1 and 2 are full.
// Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.

List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5));
// Two data missing
locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing - the last one
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5));
// One data and one parity missing
locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
// One data and one parity missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4));
// No indexes missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));

DatanodeDetails spare = MockDatanodeDetails.randomDatanodeDetails();

for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
ByteBuffer[] bufs = allocateByteBuffers(repConfig);

// this index fails, but has spare replica
int failing = dnMap.values().iterator().next();
streamFactory.setFailIndexes(failing);
dnMap.put(spare, failing);

BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());

dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
int read = ecb.read(bufs);
Assertions.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
// Check the underlying streams have been advanced by 1 chunk:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
Assertions.assertEquals(0, bis.getRemaining());
}
Assertions.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
read = ecb.read(bufs);
Assertions.assertEquals(-1, read);
}
}
}

@Test
public void testSeek() throws IOException {
// Generate the input data for 3 full stripes and generate the parity
Expand Down Expand Up @@ -688,7 +754,7 @@ public void testAllLocationsFailOnFirstRead() throws IOException {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
// Fail all the indexes containing data on their first read.
streamFactory.setFailIndexes(indexesToList(1, 4, 5));
streamFactory.setFailIndexes(1, 4, 5);
// The locations contain the padded indexes, as will often be the case
// when containers are reported by SCM.
Map<DatanodeDetails, Integer> dnMap =
Expand Down Expand Up @@ -759,14 +825,6 @@ private ECBlockReconstructedStripeInputStream createInputStream(
null, null, streamFactory, bufferPool, ecReconstructExecutor);
}

private List<Integer> indexesToList(int... indexes) {
List<Integer> list = new ArrayList<>();
for (int i : indexes) {
list.add(i);
}
return list;
}

private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) {
List<ByteBuffer> dataStreams = new ArrayList<>();
for (ByteBuffer b : data) {
Expand Down