diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 5c44f31c0d67..55dc0557bfbf 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -64,7 +64,7 @@ public class BlockInputStream extends BlockExtendedInputStream { private final BlockID blockID; private final long length; private Pipeline pipeline; - private final Token token; + private Token token; private final boolean verifyChecksum; private XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; @@ -103,19 +103,19 @@ public class BlockInputStream extends BlockExtendedInputStream { // can be reset if a new position is seeked. private int chunkIndexOfPrevPosition; - private final Function refreshPipelineFunction; + private final Function refreshFunction; public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverClientFactory, - Function refreshPipelineFunction) { + Function refreshFunction) { this.blockID = blockId; this.length = blockLen; this.pipeline = pipeline; this.token = token; this.verifyChecksum = verifyChecksum; this.xceiverClientFactory = xceiverClientFactory; - this.refreshPipelineFunction = refreshPipelineFunction; + this.refreshFunction = refreshFunction; } public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, @@ -150,12 +150,12 @@ public synchronized void initialize() throws IOException { } catch (SCMSecurityException ex) { throw ex; } catch (StorageContainerException ex) { - refreshPipeline(ex); + refreshBlockInfo(ex); catchEx = ex; } catch (IOException ex) { LOG.debug("Retry to get chunk info fail", ex); if (isConnectivityIssue(ex)) { - refreshPipeline(ex); + refreshBlockInfo(ex); } catchEx = ex; } @@ -199,17 +199,19 @@ private boolean isConnectivityIssue(IOException ex) { return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode(); } - private void refreshPipeline(IOException cause) throws IOException { + private void refreshBlockInfo(IOException cause) throws IOException { LOG.info("Unable to read information for block {} from pipeline {}: {}", blockID, pipeline.getId(), cause.getMessage()); - if (refreshPipelineFunction != null) { - LOG.debug("Re-fetching pipeline for block {}", blockID); - Pipeline newPipeline = refreshPipelineFunction.apply(blockID); - if (newPipeline == null) { - LOG.debug("No new pipeline for block {}", blockID); + if (refreshFunction != null) { + LOG.debug("Re-fetching pipeline and block token for block {}", blockID); + BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID); + if (blockLocationInfo == null) { + LOG.debug("No new block location info for block {}", blockID); } else { - LOG.debug("New pipeline for block {}: {}", blockID, newPipeline); - this.pipeline = newPipeline; + LOG.debug("New block location info for block {}: {}", + blockID, blockLocationInfo); + this.pipeline = blockLocationInfo.getPipeline(); + this.token = blockLocationInfo.getToken(); } } else { throw cause; @@ -526,7 +528,7 @@ private void handleReadError(IOException cause) throws IOException { } } - refreshPipeline(cause); + refreshBlockInfo(cause); } @VisibleForTesting diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java index 67032160169a..bd100214ae48 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java @@ -43,13 +43,13 @@ public interface BlockInputStreamFactory { * @param token The block Access Token * @param verifyChecksum Whether to verify checksums or not. * @param xceiverFactory Factory to create the xceiver in the client - * @param refreshFunction Function to refresh the pipeline if needed + * @param refreshFunction Function to refresh the block location if needed * @return BlockExtendedInputStream of the correct type. */ BlockExtendedInputStream create(ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction); + Function refreshFunction); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index ba05ec2ed8a7..40063f9ce492 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -78,7 +78,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction) { if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) { return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig, blockInfo, verifyChecksum, xceiverFactory, refreshFunction, diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index 012c14ceceb2..dc354198ca6d 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -62,7 +62,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream { private final BlockInputStreamFactory streamFactory; private final boolean verifyChecksum; private final XceiverClientFactory xceiverClientFactory; - private final Function refreshFunction; + private final Function refreshFunction; private final BlockLocationInfo blockInfo; private final DatanodeDetails[] dataLocations; private final BlockExtendedInputStream[] blockStreams; @@ -120,8 +120,9 @@ protected int availableParityLocations() { public ECBlockInputStream(ECReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, - XceiverClientFactory xceiverClientFactory, Function refreshFunction, BlockInputStreamFactory streamFactory) { + XceiverClientFactory xceiverClientFactory, + Function refreshFunction, + BlockInputStreamFactory streamFactory) { this.repConfig = repConfig; this.ecChunkSize = repConfig.getEcChunkSize(); this.verifyChecksum = verifyChecksum; @@ -215,13 +216,14 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { * @param refreshFunc * @return */ - protected Function ecPipelineRefreshFunction( - int replicaIndex, Function refreshFunc) { + protected Function ecPipelineRefreshFunction( + int replicaIndex, Function refreshFunc) { return (blockID) -> { - Pipeline ecPipeline = refreshFunc.apply(blockID); - if (ecPipeline == null) { + BlockLocationInfo blockLocationInfo = refreshFunc.apply(blockID); + if (blockLocationInfo == null) { return null; } + Pipeline ecPipeline = blockLocationInfo.getPipeline(); DatanodeDetails curIndexNode = ecPipeline.getNodes() .stream().filter(dn -> ecPipeline.getReplicaIndex(dn) == replicaIndex) @@ -229,13 +231,15 @@ protected Function ecPipelineRefreshFunction( if (curIndexNode == null) { return null; } - return Pipeline.newBuilder().setReplicationConfig( + Pipeline pipeline = Pipeline.newBuilder().setReplicationConfig( StandaloneReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE)) .setNodes(Collections.singletonList(curIndexNode)) .setId(PipelineID.randomId()) .setState(Pipeline.PipelineState.CLOSED) .build(); + blockLocationInfo.setPipeline(pipeline); + return blockLocationInfo; }; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java index c9d2b76a78de..0e2ef22c1e94 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; @@ -47,12 +46,12 @@ public interface ECBlockInputStreamFactory { * @param blockInfo The blockInfo representing the block. * @param verifyChecksum Whether to verify checksums or not. * @param xceiverFactory Factory to create the xceiver in the client - * @param refreshFunction Function to refresh the pipeline if needed + * @param refreshFunction Function to refresh the block location if needed * @return BlockExtendedInputStream of the correct type. */ BlockExtendedInputStream create(boolean missingLocations, List failedLocations, ReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction); + Function refreshFunction); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java index efc3b31c8491..36b6539ea817 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.io.ByteBufferPool; @@ -77,7 +76,7 @@ public BlockExtendedInputStream create(boolean missingLocations, List failedLocations, ReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction) { if (missingLocations) { // We create the reconstruction reader ECBlockReconstructedStripeInputStream sis = diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java index 47758eae0251..973561616f7b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java @@ -51,7 +51,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream { private final ECReplicationConfig repConfig; private final boolean verifyChecksum; private final XceiverClientFactory xceiverClientFactory; - private final Function refreshFunction; + private final Function refreshFunction; private final BlockLocationInfo blockInfo; private final ECBlockInputStreamFactory ecBlockInputStreamFactory; @@ -99,7 +99,8 @@ public static int availableDataLocations(Pipeline pipeline, public ECBlockInputStreamProxy(ECReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, XceiverClientFactory xceiverClientFactory, Function refreshFunction, ECBlockInputStreamFactory streamFactory) { + BlockLocationInfo> refreshFunction, + ECBlockInputStreamFactory streamFactory) { this.repConfig = repConfig; this.verifyChecksum = verifyChecksum; this.blockInfo = blockInfo; diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java index 0d11b88b3661..9658fb784ddc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; @@ -152,8 +151,9 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream { @SuppressWarnings("checkstyle:ParameterNumber") public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, - XceiverClientFactory xceiverClientFactory, Function refreshFunction, BlockInputStreamFactory streamFactory, + XceiverClientFactory xceiverClientFactory, + Function refreshFunction, + BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool, ExecutorService ecReconstructExecutor) { super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory, diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java index 1c7968b13446..be72dd070160 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java @@ -46,7 +46,7 @@ class DummyBlockInputStream extends BlockInputStream { Token token, boolean verifyChecksum, XceiverClientFactory xceiverClientManager, - Function refreshFunction, + Function refreshFunction, List chunkList, Map chunks) { super(blockId, blockLen, pipeline, token, verifyChecksum, diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java index 5a029763d63e..b39ed61d703b 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java @@ -18,23 +18,22 @@ package org.apache.hadoop.hdds.scm.storage; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * A dummy BlockInputStream with pipeline refresh function to mock read @@ -60,13 +59,15 @@ final class DummyBlockInputStreamWithRetry super(blockId, blockLen, pipeline, token, verifyChecksum, xceiverClientManager, blockID -> { isRerfreshed.set(true); - return Pipeline.newBuilder() - .setState(Pipeline.PipelineState.OPEN) - .setId(PipelineID.randomId()) - .setReplicationConfig(StandaloneReplicationConfig.getInstance( - ReplicationFactor.ONE)) - .setNodes(Collections.emptyList()) - .build(); + try { + BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class); + Pipeline mockPipeline = MockPipeline.createPipeline(1); + when(blockLocationInfo.getPipeline()).thenReturn(mockPipeline); + return blockLocationInfo; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, chunkList, chunkMap); this.ioException = ioException; } diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 84813f3c6d0c..04805576fd10 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -84,18 +84,18 @@ public class TestBlockInputStream { private List chunks; private Map chunkDataMap; - private Function refreshPipeline; + private Function refreshFunction; @BeforeEach @SuppressWarnings("unchecked") public void setup() throws Exception { - refreshPipeline = Mockito.mock(Function.class); + refreshFunction = Mockito.mock(Function.class); BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE); createChunkList(5); blockStream = new DummyBlockInputStream(blockID, blockSize, null, null, - false, null, refreshPipeline, chunks, chunkDataMap); + false, null, refreshFunction, chunks, chunkDataMap); } /** @@ -290,15 +290,26 @@ public void testRefreshPipelineFunction() throws Exception { void refreshesPipelineOnReadFailure(IOException ex) throws Exception { // GIVEN Pipeline pipeline = MockPipeline.createSingleNodePipeline(); + BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class); + when(blockLocationInfo.getPipeline()).thenReturn(pipeline); Pipeline newPipeline = MockPipeline.createSingleNodePipeline(); + BlockLocationInfo newBlockLocationInfo = mock(BlockLocationInfo.class); - testRefreshesPipelineOnReadFailure(ex, pipeline, id -> newPipeline); - testRefreshesPipelineOnReadFailure(ex, pipeline, id -> pipeline); - testRefreshesPipelineOnReadFailure(ex, pipeline, id -> null); + testRefreshesPipelineOnReadFailure(ex, blockLocationInfo, + id -> newBlockLocationInfo); + + when(newBlockLocationInfo.getPipeline()).thenReturn(newPipeline); + testRefreshesPipelineOnReadFailure(ex, blockLocationInfo, + id -> blockLocationInfo); + + when(newBlockLocationInfo.getPipeline()).thenReturn(null); + testRefreshesPipelineOnReadFailure(ex, blockLocationInfo, + id -> newBlockLocationInfo); } private void testRefreshesPipelineOnReadFailure(IOException ex, - Pipeline pipeline, Function refreshFunction) + BlockLocationInfo blockLocationInfo, + Function refreshPipelineFunction) throws Exception { BlockID blockID = new BlockID(new ContainerBlockID(1, 1)); @@ -306,10 +317,11 @@ private void testRefreshesPipelineOnReadFailure(IOException ex, final int len = 200; final ChunkInputStream stream = throwingChunkInputStream(ex, len, true); - when(refreshPipeline.apply(any())) - .thenAnswer(inv -> refreshFunction.apply(blockID)); + when(this.refreshFunction.apply(any())) + .thenAnswer(inv -> refreshPipelineFunction.apply(blockID)); - try (BlockInputStream subject = createSubject(blockID, pipeline, stream)) { + try (BlockInputStream subject = createSubject(blockID, + blockLocationInfo.getPipeline(), stream)) { subject.initialize(); // WHEN @@ -318,9 +330,9 @@ private void testRefreshesPipelineOnReadFailure(IOException ex, // THEN Assert.assertEquals(len, bytesRead); - verify(refreshPipeline).apply(blockID); + verify(this.refreshFunction).apply(blockID); } finally { - reset(refreshPipeline); + reset(this.refreshFunction); } } @@ -349,7 +361,7 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex, private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline, ChunkInputStream stream) { return new DummyBlockInputStream(blockID, blockSize, pipeline, null, false, - null, refreshPipeline, chunks, null) { + null, refreshFunction, chunks, null) { @Override protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { return stream; @@ -376,7 +388,7 @@ public void testReadNotRetriedOnOtherException(IOException ex) () -> subject.read(new byte[len], 0, len)); // THEN - verify(refreshPipeline, never()).apply(blockID); + verify(refreshFunction, never()).apply(blockID); } } @@ -396,17 +408,19 @@ public void testRefreshOnReadFailureAfterUnbuffer(IOException ex) Pipeline newPipeline = MockPipeline.createSingleNodePipeline(); XceiverClientFactory clientFactory = mock(XceiverClientFactory.class); XceiverClientSpi client = mock(XceiverClientSpi.class); + BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class); when(clientFactory.acquireClientForReadData(pipeline)) .thenReturn(client); final int len = 200; final ChunkInputStream stream = throwingChunkInputStream(ex, len, true); - when(refreshPipeline.apply(blockID)) - .thenReturn(newPipeline); + when(refreshFunction.apply(blockID)) + .thenReturn(blockLocationInfo); + when(blockLocationInfo.getPipeline()).thenReturn(newPipeline); BlockInputStream subject = new BlockInputStream(blockID, blockSize, - pipeline, null, false, clientFactory, refreshPipeline) { + pipeline, null, false, clientFactory, refreshFunction) { @Override protected List getChunkInfos() throws IOException { acquireClient(); @@ -429,7 +443,7 @@ protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) { // THEN Assert.assertEquals(len, bytesRead); - verify(refreshPipeline).apply(blockID); + verify(refreshFunction).apply(blockID); verify(clientFactory).acquireClientForReadData(pipeline); verify(clientFactory).releaseClientForReadData(client, false); } finally { diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java index f4d40c811c78..0fe5886f1b7d 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java @@ -258,7 +258,7 @@ public synchronized BlockExtendedInputStream create( BlockLocationInfo blockInfo, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction) { int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0)); TestBlockInputStream stream = new TestBlockInputStream( diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java index 07c6b39234d3..caa071b1b9ca 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java @@ -462,18 +462,26 @@ public void testEcPipelineRefreshFunction() { } // Create a refreshFunction that returns a hard-coded EC pipeline. - Function refreshFunction = blkID -> Pipeline.newBuilder() - .setReplicationConfig(repConfig) - .setNodes(new ArrayList<>(dnMap.keySet())) - .setReplicaIndexes(dnMap) - .setState(Pipeline.PipelineState.CLOSED) - .setId(PipelineID.randomId()) - .build(); + Function refreshFunction = blkID -> { + Pipeline pipeline = Pipeline.newBuilder() + .setReplicationConfig(repConfig) + .setNodes(new ArrayList<>(dnMap.keySet())) + .setReplicaIndexes(dnMap) + .setState(Pipeline.PipelineState.CLOSED) + .setId(PipelineID.randomId()) + .build(); + BlockLocationInfo blockLocation = new BlockLocationInfo.Builder() + .setPipeline(pipeline) + .build(); + return blockLocation; + }; try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig, keyInfo, true, null, null, streamFactory)) { Pipeline pipeline = - ecb.ecPipelineRefreshFunction(3, refreshFunction).apply(blockID); + ecb.ecPipelineRefreshFunction(3, refreshFunction) + .apply(blockID) + .getPipeline(); // Check the pipeline is built with the correct Datanode // with right replicaIndex. Assertions.assertEquals(HddsProtos.ReplicationType.STAND_ALONE, @@ -503,7 +511,7 @@ public synchronized BlockExtendedInputStream create( ReplicationConfig repConfig, BlockLocationInfo blockInfo, Pipeline pipeline, Token token, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction) { TestBlockInputStream stream = new TestBlockInputStream( blockInfo.getBlockID(), blockInfo.getLength(), (byte)blockStreams.size()); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java index 89ac7a831ede..929fa13042e4 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.junit.jupiter.api.Assertions; @@ -378,7 +377,7 @@ public BlockExtendedInputStream create(boolean missingLocations, List failedDatanodes, ReplicationConfig repConfig, BlockLocationInfo blockInfo, boolean verifyChecksum, XceiverClientFactory xceiverFactory, - Function refreshFunction) { + Function refreshFunction) { this.failedLocations = failedDatanodes; ByteBuffer wrappedBuffer = ByteBuffer.wrap(data.array(), 0, data.capacity()); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index ce068f1b3643..91d4b944045d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; @@ -85,14 +84,16 @@ private static List createStreams( xceiverClientFactory, keyBlockID -> { OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo); - return getPipeline(newKeyInfo, omKeyLocationInfo.getBlockID()); + return getBlockLocationInfo(newKeyInfo, + omKeyLocationInfo.getBlockID()); }); partStreams.add(stream); } return partStreams; } - private static Pipeline getPipeline(OmKeyInfo newKeyInfo, BlockID blockID) { + private static BlockLocationInfo getBlockLocationInfo(OmKeyInfo newKeyInfo, + BlockID blockID) { List collect = newKeyInfo.getLatestVersionLocations() .getLocationList() @@ -100,7 +101,7 @@ private static Pipeline getPipeline(OmKeyInfo newKeyInfo, BlockID blockID) { .filter(l -> l.getBlockID().equals(blockID)) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(collect)) { - return collect.get(0).getPipeline(); + return collect.get(0); } else { return null; }