diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index 70bd3847c2eb..89be7839ac0f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -36,6 +36,7 @@ import java.io.EOFException; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.function.Function; /** @@ -182,12 +183,43 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { HddsProtos.ReplicationFactor.ONE), blkInfo, pipeline, blockInfo.getToken(), verifyChecksum, xceiverClientFactory, - refreshFunction); + ecPipelineRefreshFunction(locationIndex + 1, refreshFunction)); blockStreams[locationIndex] = stream; } return stream; } + /** + * Returns a function that builds a Standalone pipeline corresponding + * to the replicaIndex given based on the EC pipeline fetched from SCM. + * @param replicaIndex + * @param refreshFunc + * @return + */ + protected Function ecPipelineRefreshFunction( + int replicaIndex, Function refreshFunc) { + return (blockID) -> { + Pipeline ecPipeline = refreshFunc.apply(blockID); + if (ecPipeline == null) { + return null; + } + DatanodeDetails curIndexNode = ecPipeline.getNodes() + .stream().filter(dn -> + ecPipeline.getReplicaIndex(dn) == replicaIndex) + .findAny().orElse(null); + if (curIndexNode == null) { + return null; + } + return Pipeline.newBuilder().setReplicationConfig( + StandaloneReplicationConfig.getInstance( + HddsProtos.ReplicationFactor.ONE)) + .setNodes(Collections.singletonList(curIndexNode)) + .setId(PipelineID.randomId()) + .setState(Pipeline.PipelineState.CLOSED) + .build(); + }; + } + /** * Returns the length of the Nth block in the block group, taking account of a * potentially partial last stripe. Note that the internal block index is diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java index 536f618f7dc6..8cf8451775e7 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java @@ -22,8 +22,10 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; @@ -394,6 +396,41 @@ public void testErrorReadingBlockReportsBadLocation() throws IOException { } } + @Test + public void testEcPipelineRefreshFunction() { + repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, + ONEMB); + BlockLocationInfo keyInfo = + ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB); + + BlockID blockID = new BlockID(1, 1); + Map dnMap = new HashMap<>(); + for (int i = 1; i <= 5; i++) { + dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), i); + } + + // Create a refreshFunction that returns a hard-coded EC pipeline. + Function refreshFunction = blkID -> Pipeline.newBuilder() + .setReplicationConfig(repConfig) + .setNodes(new ArrayList<>(dnMap.keySet())) + .setReplicaIndexes(dnMap) + .setState(Pipeline.PipelineState.CLOSED) + .setId(PipelineID.randomId()) + .build(); + + try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, + keyInfo, true, null, null, streamFactory)) { + Pipeline pipeline = + ecb.ecPipelineRefreshFunction(3, refreshFunction).apply(blockID); + // Check the pipeline is built with the correct Datanode + // with right replicaIndex. + Assertions.assertEquals(HddsProtos.ReplicationType.STAND_ALONE, + pipeline.getReplicationConfig().getReplicationType()); + Assertions.assertEquals(1, pipeline.getNodes().size()); + Assertions.assertEquals(3, dnMap.get(pipeline.getNodes().get(0))); + } + } + private void validateBufferContents(ByteBuffer buf, int from, int to, byte val) { for (int i = from; i < to; i++) {