diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ECReplicationConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ECReplicationConfig.java index 1d9da50a2ac..5b7d0338480 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ECReplicationConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ECReplicationConfig.java @@ -91,4 +91,5 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(data, parity); } + } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 90a6d94bea1..c440b6494ef 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -278,6 +279,44 @@ public ReplicationConfig getReplicationConfig() { return replicationConfig; } + public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) + throws UnknownPipelineStateException { + Preconditions.checkNotNull(pipeline, "Pipeline is null"); + + List nodes = new ArrayList<>(); + for (DatanodeDetailsProto member : pipeline.getMembersList()) { + nodes.add(DatanodeDetails.getFromProtoBuf(member)); + } + UUID leaderId = null; + if (pipeline.hasLeaderID128()) { + HddsProtos.UUID uuid = pipeline.getLeaderID128(); + leaderId = new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } else if (pipeline.hasLeaderID() && + StringUtils.isNotEmpty(pipeline.getLeaderID())) { + leaderId = UUID.fromString(pipeline.getLeaderID()); + } + + UUID suggestedLeaderId = null; + if (pipeline.hasSuggestedLeaderID()) { + HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID(); + suggestedLeaderId = + new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + + final ReplicationConfig config = ReplicationConfig + .fromProto(pipeline.getType(), pipeline.getFactor(), + pipeline.getEcReplicationConfig()); + return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) + .setReplicationConfig(config) + .setState(PipelineState.fromProtobuf(pipeline.getState())) + .setNodes(nodes) + .setLeaderId(leaderId) + .setSuggestedLeaderId(suggestedLeaderId) + .setNodesInOrder(pipeline.getMemberOrdersList()) + .setCreateTimestamp(pipeline.getCreationTimeStamp()) + .build(); + } + public HddsProtos.Pipeline getProtobufMessage(int clientVersion) throws UnknownPipelineStateException { @@ -292,13 +331,18 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion) HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder() .setId(id.getProtobuf()) .setType(replicationConfig.getReplicationType()) - .setFactor(ReplicationConfig.getLegacyFactor(replicationConfig)) .setState(PipelineState.getProtobuf(state)) .setLeaderID(leaderId != null ? leaderId.toString() : "") .setCreationTimeStamp(creationTimestamp.toEpochMilli()) .addAllMembers(members) .addAllMemberReplicaIndexes(memberReplicaIndexes); + if (replicationConfig instanceof ECReplicationConfig) { + builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig) + .toProto()); + } else { + builder.setFactor(ReplicationConfig.getLegacyFactor(replicationConfig)); + } if (leaderId != null) { HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() .setMostSigBits(leaderId.getMostSignificantBits()) @@ -335,43 +379,6 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion) return builder.build(); } - public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) - throws UnknownPipelineStateException { - Preconditions.checkNotNull(pipeline, "Pipeline is null"); - - List nodes = new ArrayList<>(); - for (DatanodeDetailsProto member : pipeline.getMembersList()) { - nodes.add(DatanodeDetails.getFromProtoBuf(member)); - } - UUID leaderId = null; - if (pipeline.hasLeaderID128()) { - HddsProtos.UUID uuid = pipeline.getLeaderID128(); - leaderId = new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); - } else if (pipeline.hasLeaderID() && - StringUtils.isNotEmpty(pipeline.getLeaderID())) { - leaderId = UUID.fromString(pipeline.getLeaderID()); - } - - UUID suggestedLeaderId = null; - if (pipeline.hasSuggestedLeaderID()) { - HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID(); - suggestedLeaderId = - new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); - } - - final ReplicationConfig config = ReplicationConfig - .fromProto(pipeline.getType(), pipeline.getFactor()); - return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) - .setReplicationConfig(config) - .setState(PipelineState.fromProtobuf(pipeline.getState())) - .setNodes(nodes) - .setLeaderId(leaderId) - .setSuggestedLeaderId(suggestedLeaderId) - .setNodesInOrder(pipeline.getMemberOrdersList()) - .setCreateTimestamp(pipeline.getCreationTimeStamp()) - .build(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java index b7b3dc6340d..1212b472815 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Objects; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -87,6 +88,24 @@ public static Pipeline createRatisPipeline() { .build(); } + public static Pipeline createEcPipeline() { + + List nodes = new ArrayList<>(); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + nodes.add(MockDatanodeDetails.randomDatanodeDetails()); + + return Pipeline.newBuilder() + .setState(Pipeline.PipelineState.OPEN) + .setId(PipelineID.randomId()) + .setReplicationConfig( + new ECReplicationConfig(3, 2)) + .setNodes(nodes) + .build(); + } + private MockPipeline() { throw new UnsupportedOperationException("no instances"); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java index 504f9495def..2cf3bf08a90 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -48,4 +49,23 @@ public void protoIncludesNewPortsOnlyForV1() throws IOException { assertPorts(dn, ALL_PORTS); } } + + @Test + public void getProtobufMessageEC() throws IOException { + Pipeline subject = MockPipeline.createPipeline(3); + + //when EC config is empty/null + HddsProtos.Pipeline protobufMessage = subject.getProtobufMessage(1); + Assert.assertEquals(0, protobufMessage.getEcReplicationConfig().getData()); + + + //when EC config is NOT empty + subject = MockPipeline.createEcPipeline(); + + protobufMessage = subject.getProtobufMessage(1); + Assert.assertEquals(3, protobufMessage.getEcReplicationConfig().getData()); + Assert + .assertEquals(2, protobufMessage.getEcReplicationConfig().getParity()); + + } } diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index dbbb284debf..ba8f916c4f9 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -114,6 +114,7 @@ message Pipeline { optional uint64 creationTimeStamp = 8; optional UUID suggestedLeaderID = 9; repeated uint32 memberReplicaIndexes = 10; + optional ECReplicationConfig ecReplicationConfig = 11; // TODO(runzhiwang): when leaderID is gone, specify 6 as the index of leaderID128 optional UUID leaderID128 = 100; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index 1938afe94a5..e95fc072cb0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -181,7 +181,8 @@ public AllocateScmBlockResponseProto allocateScmBlock( request.getNumBlocks(), ReplicationConfig.fromProto( request.getType(), - request.getFactor()), + request.getFactor(), + request.getEcReplicationConfig()), request.getOwner(), ExcludeList.getFromProtoBuf(request.getExcludeList()));