diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java index 65e124eaf5ec..5046a448959b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java @@ -29,7 +29,7 @@ */ public class BadDataLocationException extends IOException { - private List failedLocations = new ArrayList<>(); + private final List failedLocations = new ArrayList<>(); private int failedLocationIndex; public BadDataLocationException(DatanodeDetails dn) { @@ -53,6 +53,13 @@ public BadDataLocationException(DatanodeDetails dn, Throwable ex) { failedLocations.add(dn); } + public BadDataLocationException(int failedIndex, + Throwable ex, List failedLocations) { + super(ex); + failedLocationIndex = failedIndex; + this.failedLocations.addAll(failedLocations); + } + public BadDataLocationException(DatanodeDetails dn, int failedIndex, Throwable ex) { super(ex); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index dc354198ca6d..8ad8f8851e81 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -334,7 +334,7 @@ public synchronized int read(ByteBuffer byteBuffer) throws IOException { } } - private boolean shouldRetryFailedRead(int failedIndex) { + protected boolean shouldRetryFailedRead(int failedIndex) { Deque spareLocations = spareDataLocations.get(failedIndex); if (spareLocations != null && spareLocations.size() > 0) { failedLocations.add(dataLocations[failedIndex]); @@ -470,7 +470,7 @@ protected synchronized void closeStreams() { seeked = true; } - private void closeStream(int i) { + protected void closeStream(int i) { if (blockStreams[i] != null) { try { blockStreams[i].close(); diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java index 9658fb784ddc..492bf33a100b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; @@ -610,6 +611,31 @@ protected void loadDataBuffersFromStream() } private void readIntoBuffer(int ind, ByteBuffer buf) throws IOException { + List failedLocations = new LinkedList<>(); + while (true) { + int currentBufferPosition = buf.position(); + try { + readFromCurrentLocation(ind, buf); + break; + } catch (IOException e) { + DatanodeDetails failedLocation = getDataLocations()[ind]; + failedLocations.add(failedLocation); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: read [{}] failed from {} due to {}", this, + ind, failedLocation, e.getMessage()); + } + closeStream(ind); + if (shouldRetryFailedRead(ind)) { + buf.position(currentBufferPosition); + } else { + throw new BadDataLocationException(ind, e, failedLocations); + } + } + } + } + + private void readFromCurrentLocation(int ind, ByteBuffer buf) + throws IOException { BlockExtendedInputStream stream = getOrOpenStream(ind); seekStreamIfNecessary(stream, 0); while (buf.hasRemaining()) { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java index 0fe5886f1b7d..db08bd73439a 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java @@ -223,7 +223,7 @@ public static class TestBlockInputStreamFactory implements new LinkedHashMap<>(); private List blockStreamData; // List of EC indexes that should fail immediately on read - private List failIndexes = new ArrayList<>(); + private final List failIndexes = new ArrayList<>(); private Pipeline currentPipeline; @@ -249,8 +249,9 @@ public synchronized void setCurrentPipeline(Pipeline pipeline) { this.currentPipeline = pipeline; } - public synchronized void setFailIndexes(List fail) { - failIndexes.addAll(fail); + // fail each index in the list once + public synchronized void setFailIndexes(Integer... fail) { + failIndexes.addAll(Arrays.asList(fail)); } public synchronized BlockExtendedInputStream create( @@ -264,7 +265,7 @@ public synchronized BlockExtendedInputStream create( TestBlockInputStream stream = new TestBlockInputStream( blockInfo.getBlockID(), blockInfo.getLength(), blockStreamData.get(repInd - 1), repInd); - if (failIndexes.contains(repInd)) { + if (failIndexes.remove(Integer.valueOf(repInd))) { stream.setShouldError(true); } blockStreams.put(repInd, stream); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java index 7ad6d3e185fc..62d8c2d76023 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; @@ -489,6 +490,71 @@ public void testErrorThrownIfBlockNotLongEnough() throws IOException { } } + @Test + void testNoErrorIfSpareLocationToRead() throws IOException { + int chunkSize = repConfig.getEcChunkSize(); + int blockLength = chunkSize * 3 - 1; + + ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB); + ECStreamTestUtil + .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength); + ByteBuffer[] parity = generateParity(dataBufs, repConfig); + + // We have a length that is less than a stripe, so chunks 1 and 2 are full. + // Block 1 is lost and needs recovered + // from the parity and padded blocks 2 and 3. + + List> locations = new ArrayList<>(); + // Two data missing + locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5)); + // Two data missing + locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5)); + // One data missing - the last one + locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5)); + // One data and one parity missing + locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4)); + // One data and one parity missing + locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4)); + // No indexes missing + locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5)); + + DatanodeDetails spare = MockDatanodeDetails.randomDatanodeDetails(); + + for (Map dnMap : locations) { + streamFactory = new TestBlockInputStreamFactory(); + addDataStreamsToFactory(dataBufs, parity); + ByteBuffer[] bufs = allocateByteBuffers(repConfig); + + // this index fails, but has spare replica + int failing = dnMap.values().iterator().next(); + streamFactory.setFailIndexes(failing); + dnMap.put(spare, failing); + + BlockLocationInfo keyInfo = + ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap); + streamFactory.setCurrentPipeline(keyInfo.getPipeline()); + + dataGen = new SplittableRandom(randomSeed); + try (ECBlockReconstructedStripeInputStream ecb = + createInputStream(keyInfo)) { + int read = ecb.read(bufs); + Assertions.assertEquals(blockLength, read); + ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen); + ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen); + ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen); + // Check the underlying streams have been advanced by 1 chunk: + for (TestBlockInputStream bis : streamFactory.getBlockStreams()) { + Assertions.assertEquals(0, bis.getRemaining()); + } + Assertions.assertEquals(ecb.getPos(), blockLength); + clearBuffers(bufs); + // A further read should give EOF + read = ecb.read(bufs); + Assertions.assertEquals(-1, read); + } + } + } + @Test public void testSeek() throws IOException { // Generate the input data for 3 full stripes and generate the parity @@ -688,7 +754,7 @@ public void testAllLocationsFailOnFirstRead() throws IOException { streamFactory = new TestBlockInputStreamFactory(); addDataStreamsToFactory(dataBufs, parity); // Fail all the indexes containing data on their first read. - streamFactory.setFailIndexes(indexesToList(1, 4, 5)); + streamFactory.setFailIndexes(1, 4, 5); // The locations contain the padded indexes, as will often be the case // when containers are reported by SCM. Map dnMap = @@ -759,14 +825,6 @@ private ECBlockReconstructedStripeInputStream createInputStream( null, null, streamFactory, bufferPool, ecReconstructExecutor); } - private List indexesToList(int... indexes) { - List list = new ArrayList<>(); - for (int i : indexes) { - list.add(i); - } - return list; - } - private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) { List dataStreams = new ArrayList<>(); for (ByteBuffer b : data) {