diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 96ab1fabbb1..1003eca8f05 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -17,8 +17,6 @@ package org.apache.hadoop.hdds.scm.storage; -import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.EOFException; @@ -32,7 +30,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -295,10 +292,7 @@ private void setPipeline(Pipeline pipeline) throws IOException { boolean okForRead = pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE || pipeline.getType() == HddsProtos.ReplicationType.EC; - Pipeline readPipeline = okForRead ? pipeline : Pipeline.newBuilder(pipeline) - .setReplicationConfig(StandaloneReplicationConfig.getInstance( - getLegacyFactor(pipeline.getReplicationConfig()))) - .build(); + Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead(); pipelineRef.set(readPipeline); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index e7e9068e0b5..cff7d73dcd1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -39,7 +39,9 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -49,6 +51,7 @@ import org.apache.hadoop.hdds.utils.db.DelegatedCodec; import org.apache.hadoop.hdds.utils.db.Proto2Codec; import org.apache.hadoop.ozone.ClientVersion; +import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -425,8 +428,32 @@ public Pipeline copyWithNodesInOrder(List nodes) { return toBuilder().setNodesInOrder(nodes).build(); } + public Pipeline copyForRead() { + if (replicationConfig.getReplicationType() == ReplicationType.STAND_ALONE) { + return this; + } + + HddsProtos.ReplicationFactor factor = replicationConfig instanceof ReplicatedReplicationConfig + ? ((ReplicatedReplicationConfig) replicationConfig).getReplicationFactor() + : HddsProtos.ReplicationFactor.ONE; + + return toBuilder() + .setReplicationConfig(StandaloneReplicationConfig.getInstance(factor)) + .build(); + } + + public Pipeline copyForReadFromNode(DatanodeDetails node) { + Preconditions.assertTrue(nodeStatus.containsKey(node), () -> node + " is not part of the pipeline " + id.getId()); + + return toBuilder() + .setNodes(Collections.singletonList(node)) + .setReplicaIndexes(Collections.singletonMap(node, getReplicaIndex(node))) + .setReplicationConfig(StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE)) + .build(); + } + public Builder toBuilder() { - return newBuilder(this); + return new Builder(this); } public static Builder toBuilder(HddsProtos.Pipeline pipeline) { @@ -465,7 +492,8 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline) { final ReplicationConfig config = ReplicationConfig .fromProto(pipeline.getType(), pipeline.getFactor(), pipeline.getEcReplicationConfig()); - return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) + return newBuilder() + .setId(PipelineID.getFromProtobuf(pipeline.getId())) .setReplicationConfig(config) .setState(PipelineState.fromProtobuf(pipeline.getState())) .setNodes(new ArrayList<>(nodes.keySet())) @@ -531,14 +559,10 @@ public static Builder newBuilder() { return new Builder(); } - public static Builder newBuilder(Pipeline pipeline) { - return new Builder(pipeline); - } - /** * Builder class for Pipeline. */ - public static class Builder { + public static final class Builder { private PipelineID id = null; private ReplicationConfig replicationConfig = null; private PipelineState state = null; @@ -550,9 +574,9 @@ public static class Builder { private DatanodeID suggestedLeaderId = null; private Map replicaIndexes = ImmutableMap.of(); - public Builder() { } + private Builder() { } - public Builder(Pipeline pipeline) { + private Builder(Pipeline pipeline) { this.id = pipeline.id; this.replicationConfig = pipeline.replicationConfig; this.state = pipeline.state; @@ -599,8 +623,20 @@ public Builder setLeaderId(DatanodeID leaderId1) { } public Builder setNodes(List nodes) { - this.nodeStatus = new LinkedHashMap<>(); - nodes.forEach(node -> nodeStatus.put(node, -1L)); + Map newNodeStatus = new LinkedHashMap<>(); + nodes.forEach(node -> newNodeStatus.put(node, -1L)); + + // replace pipeline ID if nodes are not the same + if (nodeStatus != null && !nodeStatus.keySet().equals(newNodeStatus.keySet())) { + if (nodes.size() == 1) { + setId(nodes.iterator().next().getID()); + } else { + setId(PipelineID.randomId()); + } + } + + nodeStatus = newNodeStatus; + if (nodesInOrder != null) { // nodesInOrder may belong to another pipeline, avoid overwriting it nodesInOrder = new LinkedList<>(nodesInOrder); diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java index 4697e1f241b..c6ab6678e8b 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java @@ -17,15 +17,22 @@ package org.apache.hadoop.hdds.scm.pipeline; +import static java.util.Collections.singletonList; import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS; import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS; +import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.protocol.TestDatanodeDetails.assertPorts; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.apache.hadoop.ozone.ClientVersion.DEFAULT_VERSION; import static org.apache.hadoop.ozone.ClientVersion.VERSION_HANDLES_UNKNOWN_DN_PORTS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.junit.jupiter.api.Test; @@ -91,14 +98,11 @@ public void testECPipelineIsAlwaysHealthy() { @Test public void testBuilderCopiesAllFieldsFromOtherPipeline() { Pipeline original = MockPipeline.createEcPipeline(); - Pipeline copied = Pipeline.newBuilder(original).build(); + Pipeline copied = original.toBuilder().build(); assertEquals(original.getId(), copied.getId()); assertEquals(original.getReplicationConfig(), copied.getReplicationConfig()); assertEquals(original.getPipelineState(), copied.getPipelineState()); - assertEquals(original.getId(), copied.getId()); - assertEquals(original.getId(), copied.getId()); - assertEquals(original.getId(), copied.getId()); assertEquals(original.getNodeSet(), copied.getNodeSet()); assertEquals(original.getNodesInOrder(), copied.getNodesInOrder()); assertEquals(original.getLeaderId(), copied.getLeaderId()); @@ -111,4 +115,36 @@ public void testBuilderCopiesAllFieldsFromOtherPipeline() { copied.getReplicaIndex(dn)); } } + + @Test + void idChangedIfNodesReplaced() { + Pipeline original = MockPipeline.createRatisPipeline(); + + Pipeline withDifferentNodes = original.toBuilder() + .setNodes(Arrays.asList(randomDatanodeDetails(), randomDatanodeDetails(), randomDatanodeDetails())) + .build(); + + assertNotEquals(original.getId(), withDifferentNodes.getId()); + withDifferentNodes.getNodes() + .forEach(node -> assertNotEquals(node.getID().toPipelineID(), withDifferentNodes.getId())); + } + + @Test + void testCopyForReadFromNode() { + Pipeline subject = MockPipeline.createRatisPipeline(); + DatanodeDetails node = subject.getNodes().iterator().next(); + + Pipeline copy = subject.copyForReadFromNode(node); + + assertEquals(singletonList(node), copy.getNodes()); + assertEquals(node.getID().toPipelineID(), copy.getId()); + assertEquals(subject.getReplicaIndex(node), copy.getReplicaIndex(node)); + assertEquals(StandaloneReplicationConfig.getInstance(ONE), copy.getReplicationConfig()); + } + + @Test + void testCopyForReadFromNodeRejectsUnknownNode() { + Pipeline subject = MockPipeline.createRatisPipeline(); + assertThrows(IllegalStateException.class, () -> subject.copyForReadFromNode(randomDatanodeDetails())); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 67212af4bed..a1d86bd7071 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -400,7 +400,7 @@ Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state) return pipeline; } Pipeline updatedPipeline = pipelineMap.compute(pipelineID, - (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build()); + (id, p) -> pipeline.toBuilder().setState(state).build()); List pipelineList = query2OpenPipelines.get(pipeline.getReplicationConfig()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java index a5bf54b6249..460d08eb60d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java @@ -466,7 +466,7 @@ public void testQueryPipeline() throws IOException, TimeoutException { Pipeline pipeline2 = createDummyPipeline(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, 3); - pipeline2 = Pipeline.newBuilder(pipeline2) + pipeline2 = pipeline2.toBuilder() .setState(Pipeline.PipelineState.OPEN) .build(); HddsProtos.Pipeline pipelineProto2 = pipeline2 diff --git a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java index 072babf1f01..c5a60110fc4 100644 --- a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java +++ b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java @@ -328,7 +328,7 @@ private List getReplicas(boolean includeIndex) { } private ContainerWithPipeline getContainerWithPipeline(long containerID) { - Pipeline pipeline = new Pipeline.Builder() + Pipeline pipeline = Pipeline.newBuilder() .setState(Pipeline.PipelineState.CLOSED) .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) .setId(PipelineID.randomId()) @@ -349,7 +349,7 @@ private ContainerWithPipeline getContainerWithPipeline(long containerID) { } private ContainerWithPipeline getECContainerWithPipeline() { - Pipeline pipeline = new Pipeline.Builder() + Pipeline pipeline = Pipeline.newBuilder() .setState(Pipeline.PipelineState.CLOSED) .setReplicationConfig(new ECReplicationConfig(3, 2)) .setId(PipelineID.randomId()) diff --git a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java index 1009c35ba9b..ad63c84c860 100644 --- a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java +++ b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestClosePipelinesSubCommand.java @@ -148,7 +148,7 @@ private List createPipelines() { private Pipeline createPipeline(ReplicationConfig repConfig, Pipeline.PipelineState state) { - return new Pipeline.Builder() + return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setCreateTimestamp(System.currentTimeMillis()) .setState(state) diff --git a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java index b302382547c..2dc57b55265 100644 --- a/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java +++ b/hadoop-ozone/cli-admin/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java @@ -195,7 +195,7 @@ private List createPipelines() { private Pipeline createPipeline(ReplicationConfig repConfig, Pipeline.PipelineState state) { - return new Pipeline.Builder() + return Pipeline.newBuilder() .setId(PipelineID.randomId()) .setCreateTimestamp(System.currentTimeMillis()) .setState(state) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java index 7566025de22..34259fbb371 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java @@ -82,7 +82,7 @@ protected List getChunkInfos(OmKeyLocationInfo } } - pipeline = Pipeline.newBuilder(pipeline) + pipeline = pipeline.toBuilder() .setReplicationConfig(StandaloneReplicationConfig .getInstance(HddsProtos.ReplicationFactor.THREE)) .setNodes(nodes) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java index 147b8ad5397..cded422180c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java @@ -20,10 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -73,15 +70,8 @@ protected List getChunkInfos( // irrespective of the container state, we will always read via Standalone // protocol. Token token = keyLocationInfo.getToken(); - Pipeline pipeline = keyLocationInfo.getPipeline(); + Pipeline pipeline = keyLocationInfo.getPipeline().copyForRead(); BlockID blockID = keyLocationInfo.getBlockID(); - if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { - pipeline = Pipeline.newBuilder(pipeline) - .setReplicationConfig(StandaloneReplicationConfig.getInstance( - ReplicationConfig - .getLegacyFactor(pipeline.getReplicationConfig()))) - .build(); - } List chunks; XceiverClientSpi xceiverClientSpi = null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index dc6384562b7..293437c195f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -82,7 +82,6 @@ import org.apache.hadoop.hdds.scm.client.ClientTrustManager; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; import org.apache.hadoop.hdds.scm.storage.MultipartInputStream; import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider; @@ -1579,11 +1578,7 @@ public OzoneInputStream getKey( List datanodes = pipelineBefore.getNodes(); for (DatanodeDetails dn : datanodes) { - List nodes = new ArrayList<>(); - nodes.add(dn); - Pipeline pipeline - = new Pipeline.Builder(pipelineBefore).setNodes(nodes) - .setId(PipelineID.randomId()).build(); + Pipeline pipeline = pipelineBefore.copyForReadFromNode(dn); long length = replicationConfig instanceof ECReplicationConfig ? ECBlockInputStream.internalBlockLength(pipelineBefore.getReplicaIndex(dn), (ECReplicationConfig) replicationConfig, locationInfo.getLength()) diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java index a678db91e97..2116a903fc3 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java @@ -17,8 +17,6 @@ package org.apache.hadoop.ozone.freon; -import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor; - import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -26,7 +24,6 @@ import java.util.List; import java.util.concurrent.Callable; import org.apache.hadoop.hdds.cli.HddsVersionProvider; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.XceiverClientCreator; @@ -144,10 +141,7 @@ public Void call() throws Exception { LOG.warn("Read only is not set to true for GRPC, setting it to true"); readOnly = true; } - pipeline = Pipeline.newBuilder(pipeline) - .setReplicationConfig(StandaloneReplicationConfig.getInstance( - getLegacyFactor(pipeline.getReplicationConfig()))) - .build(); + pipeline = pipeline.copyForRead(); } encodedContainerToken = scmClient.getEncodedContainerToken(containerID); XceiverClientFactory xceiverClientManager; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/BlockExistenceVerifier.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/BlockExistenceVerifier.java index 6970e951c86..dde79989d02 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/BlockExistenceVerifier.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/BlockExistenceVerifier.java @@ -17,11 +17,7 @@ package org.apache.hadoop.ozone.debug.replicas; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; - import java.io.IOException; -import java.util.Collections; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -51,16 +47,10 @@ public BlockExistenceVerifier(OzoneConfiguration conf) throws IOException { } @Override - public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation, - int replicaIndex) { + public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation) { XceiverClientSpi client = null; try { - Pipeline pipeline = Pipeline.newBuilder(keyLocation.getPipeline()) - .setId(datanode.getID()) - .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) - .setNodes(Collections.singletonList(datanode)) - .setReplicaIndexes(Collections.singletonMap(datanode, replicaIndex)) - .build(); + Pipeline pipeline = keyLocation.getPipeline().copyForReadFromNode(datanode); client = xceiverClientManager.acquireClientForReadData(pipeline); ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls.getBlock( diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ChecksumVerifier.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ChecksumVerifier.java index 7f099bede00..96f8218526d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ChecksumVerifier.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ChecksumVerifier.java @@ -17,14 +17,10 @@ package org.apache.hadoop.ozone.debug.replicas; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; - import java.io.IOException; import java.io.InputStream; -import java.util.Collections; import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.OzoneClientConfig; @@ -57,14 +53,8 @@ public ChecksumVerifier(OzoneConfiguration conf) throws IOException { } @Override - public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation, - int replicaIndex) { - Pipeline pipeline = Pipeline.newBuilder(keyLocation.getPipeline()) - .setId(datanode.getID()) - .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) - .setNodes(Collections.singletonList(datanode)) - .setReplicaIndexes(Collections.singletonMap(datanode, replicaIndex)) - .build(); + public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation) { + Pipeline pipeline = keyLocation.getPipeline().copyForReadFromNode(datanode); try (InputStream is = new BlockInputStreamFactoryImpl().create( keyLocation.getPipeline().getReplicationConfig(), diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ContainerStateVerifier.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ContainerStateVerifier.java index c34d2e17d3b..fef3030fd15 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ContainerStateVerifier.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ContainerStateVerifier.java @@ -17,14 +17,10 @@ package org.apache.hadoop.ozone.debug.replicas; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; - import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.io.IOException; -import java.util.Collections; import java.util.Objects; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; @@ -68,15 +64,14 @@ public String getType() { } @Override - public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation, - int replicaIndex) { + public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation) { try { StringBuilder replicaCheckMsg = new StringBuilder().append("Replica state is "); boolean pass = false; ContainerInfoToken containerInfoToken = getContainerInfoToken(keyLocation.getContainerID()); ContainerDataProto containerData = fetchContainerDataFromDatanode(datanode, keyLocation.getContainerID(), - keyLocation, replicaIndex, containerInfoToken); + keyLocation, containerInfoToken); if (containerData == null) { return BlockVerificationResult.failIncomplete("No container data returned from DN."); @@ -112,18 +107,13 @@ private boolean areContainerAndReplicasInGoodState(ContainerDataProto.State repl } private ContainerDataProto fetchContainerDataFromDatanode(DatanodeDetails dn, long containerId, - OmKeyLocationInfo keyLocation, int replicaIndex, + OmKeyLocationInfo keyLocation, ContainerInfoToken containerInfoToken) throws IOException { XceiverClientSpi client = null; ReadContainerResponseProto response; try { - Pipeline pipeline = Pipeline.newBuilder(keyLocation.getPipeline()) - .setId(dn.getID()) - .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)) - .setNodes(Collections.singletonList(dn)) - .setReplicaIndexes(Collections.singletonMap(dn, replicaIndex)) - .build(); + Pipeline pipeline = keyLocation.getPipeline().copyForReadFromNode(dn); String encodedToken = containerInfoToken.getEncodedToken(); client = xceiverClientManager.acquireClientForReadData(pipeline); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicaVerifier.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicaVerifier.java index af9bcdac4b8..07dae98de1d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicaVerifier.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicaVerifier.java @@ -24,7 +24,7 @@ * Functional interface for implementing a block verifier. */ public interface ReplicaVerifier { - BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation, int replicaIndex); + BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation); String getType(); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java index 2397f8c3d10..00ba8501f65 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java @@ -184,7 +184,7 @@ void processKey(OzoneClient ozoneClient, String volumeName, String bucketName, S int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode); for (ReplicaVerifier verifier : replicaVerifiers) { - BlockVerificationResult result = verifier.verifyBlock(datanode, keyLocation, replicaIndex); + BlockVerificationResult result = verifier.verifyBlock(datanode, keyLocation); ObjectNode checkNode = checksArray.addObject(); checkNode.put("type", verifier.getType()); checkNode.put("completed", result.isCompleted()); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/chunk/ChunkKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/chunk/ChunkKeyHandler.java index 95b1a97914b..c2657f424ce 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/chunk/ChunkKeyHandler.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/chunk/ChunkKeyHandler.java @@ -17,8 +17,6 @@ package org.apache.hadoop.ozone.debug.replicas.chunk; -import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; - import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.File; @@ -27,7 +25,6 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto; @@ -106,9 +103,7 @@ protected void execute(OzoneClient client, OzoneAddress address) HddsProtos.ReplicationType.EC; Pipeline pipeline; if (!isECKey && keyPipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) { - pipeline = Pipeline.newBuilder(keyPipeline) - .setReplicationConfig(StandaloneReplicationConfig - .getInstance(ONE)).build(); + pipeline = keyPipeline.copyForRead(); } else { pipeline = keyPipeline; }