Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
Expand Down Expand Up @@ -196,7 +198,10 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
// protocol.
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
.setReplicationConfig(new StandaloneReplicationConfig(
ReplicationConfig
.getLegacyFactor(pipeline.getReplicationConfig())))
.build();
}
acquireClient();
boolean success = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -59,8 +60,8 @@ final class DummyBlockInputStreamWithRetry
return Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE)
.setReplicationConfig(new StandaloneReplicationConfig(
ReplicationFactor.ONE))
.setNodes(Collections.emptyList())
.build();
}, chunkList, chunkMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,8 +50,7 @@ public final class Pipeline {

private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private final PipelineID id;
private final ReplicationType type;
private final ReplicationFactor factor;
private final ReplicationConfig replicationConfig;

private PipelineState state;
private Map<DatanodeDetails, Long> nodeStatus;
Expand All @@ -69,12 +68,11 @@ public final class Pipeline {
* ContainerStateManager#getMatchingContainerByPipeline to take a lock on
* the container allocations for a particular pipeline.
*/
private Pipeline(PipelineID id, ReplicationType type,
ReplicationFactor factor, PipelineState state,
private Pipeline(PipelineID id,
ReplicationConfig replicationConfig, PipelineState state,
Map<DatanodeDetails, Long> nodeStatus, UUID suggestedLeaderId) {
this.id = id;
this.type = type;
this.factor = factor;
this.replicationConfig = replicationConfig;
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = Instant.now();
Expand All @@ -96,16 +94,7 @@ public PipelineID getId() {
* @return type - Simple or Ratis.
*/
public ReplicationType getType() {
return type;
}

/**
* Returns the factor.
*
* @return type - Simple or Ratis.
*/
public ReplicationFactor getFactor() {
return factor;
return replicationConfig.getReplicationType();
}

/**
Expand Down Expand Up @@ -186,6 +175,7 @@ public boolean sameDatanodes(Pipeline pipeline) {
return getNodeSet().equals(pipeline.getNodeSet());
}


/**
* Returns the leader if found else defaults to closest node.
*
Expand Down Expand Up @@ -266,6 +256,10 @@ public boolean isEmpty() {
return nodeStatus.isEmpty();
}

public ReplicationConfig getReplicationConfig() {
return replicationConfig;
}

public HddsProtos.Pipeline getProtobufMessage(int clientVersion)
throws UnknownPipelineStateException {
List<HddsProtos.DatanodeDetailsProto> members = new ArrayList<>();
Expand All @@ -275,8 +269,8 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion)

HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder()
.setId(id.getProtobuf())
.setType(type)
.setFactor(factor)
.setType(replicationConfig.getReplicationType())
.setFactor(ReplicationConfig.getLegacyFactor(replicationConfig))
.setState(PipelineState.getProtobuf(state))
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.setCreationTimeStamp(creationTimestamp.toEpochMilli())
Expand Down Expand Up @@ -342,9 +336,10 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
}

final ReplicationConfig config = ReplicationConfig
.fromProto(pipeline.getType(), pipeline.getFactor());
return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
.setFactor(pipeline.getFactor())
.setType(pipeline.getType())
.setReplicationConfig(config)
.setState(PipelineState.fromProtobuf(pipeline.getState()))
.setNodes(nodes)
.setLeaderId(leaderId)
Expand All @@ -367,8 +362,7 @@ public boolean equals(Object o) {

return new EqualsBuilder()
.append(id, that.id)
.append(type, that.type)
.append(factor, that.factor)
.append(replicationConfig, that.replicationConfig)
.append(getNodes(), that.getNodes())
.isEquals();
}
Expand All @@ -377,8 +371,7 @@ public boolean equals(Object o) {
public int hashCode() {
return new HashCodeBuilder()
.append(id)
.append(type)
.append(factor)
.append(replicationConfig.getReplicationType())
.append(nodeStatus)
.toHashCode();
}
Expand All @@ -390,8 +383,7 @@ public String toString() {
b.append(" Id: ").append(id.getId());
b.append(", Nodes: ");
nodeStatus.keySet().forEach(b::append);
b.append(", Type:").append(getType());
b.append(", Factor:").append(getFactor());
b.append(", ReplicationConfig: ").append(replicationConfig);
b.append(", State:").append(getPipelineState());
b.append(", leaderId:").append(leaderId != null ? leaderId.toString() : "");
b.append(", CreationTimestamp").append(getCreationTimestamp());
Expand All @@ -412,8 +404,7 @@ public static Builder newBuilder(Pipeline pipeline) {
*/
public static class Builder {
private PipelineID id = null;
private ReplicationType type = null;
private ReplicationFactor factor = null;
private ReplicationConfig replicationConfig = null;
private PipelineState state = null;
private Map<DatanodeDetails, Long> nodeStatus = null;
private List<Integer> nodeOrder = null;
Expand All @@ -426,8 +417,7 @@ public Builder() {}

public Builder(Pipeline pipeline) {
this.id = pipeline.id;
this.type = pipeline.type;
this.factor = pipeline.factor;
this.replicationConfig = pipeline.replicationConfig;
this.state = pipeline.state;
this.nodeStatus = pipeline.nodeStatus;
this.nodesInOrder = pipeline.nodesInOrder.get();
Expand All @@ -441,13 +431,8 @@ public Builder setId(PipelineID id1) {
return this;
}

public Builder setType(ReplicationType type1) {
this.type = type1;
return this;
}

public Builder setFactor(ReplicationFactor factor1) {
this.factor = factor1;
public Builder setReplicationConfig(ReplicationConfig replicationConf) {
this.replicationConfig = replicationConf;
return this;
}

Expand Down Expand Up @@ -484,12 +469,12 @@ public Builder setSuggestedLeaderId(UUID uuid) {

public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
Preconditions.checkNotNull(factor);
Preconditions.checkNotNull(replicationConfig);
Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline =
new Pipeline(id, type, factor, state, nodeStatus, suggestedLeaderId);
new Pipeline(id, replicationConfig, state, nodeStatus,
suggestedLeaderId);
pipeline.setLeaderId(leaderId);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.util.List;
import java.util.Objects;

import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -65,8 +65,8 @@ public static Pipeline createPipeline(Iterable<DatanodeDetails> ids) {
return Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
.setType(HddsProtos.ReplicationType.STAND_ALONE)
.setFactor(HddsProtos.ReplicationFactor.ONE)
.setReplicationConfig(
new StandaloneReplicationConfig(ReplicationFactor.ONE))
.setNodes(dns)
.build();
}
Expand All @@ -81,8 +81,8 @@ public static Pipeline createRatisPipeline() {
return Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
.setType(ReplicationType.RATIS)
.setFactor(ReplicationFactor.THREE)
.setReplicationConfig(
new RatisReplicationConfig(ReplicationFactor.THREE))
.setNodes(nodes)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.protocol;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfig;
Expand Down Expand Up @@ -59,8 +60,30 @@ public interface ScmBlockLocationProtocol extends Closeable {
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
@Deprecated
default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationType type, ReplicationFactor factor, String owner,
ExcludeList excludeList) throws IOException {
return allocateBlock(size, numBlocks, ReplicationConfig
.fromTypeAndFactor(type, factor), owner, excludeList);
}

/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
*
* @param size - size of the block.
* @param numBlocks - number of blocks.
* @param replicationConfig - replicationConfiguration
* @param owner - service owner of the new block
* @param excludeList List of datanodes/containers to exclude during
* block
* allocation.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why owner fix included in this patch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only javadoc fixes are included. Here it is showed only because I modified ReplicationConfig at the same line:

Old code

       ReplicationType type, ReplicationFactor factor, String owner,

New code:

       ReplicationType type, ReplicationFactor factor, String owner,

ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
Expand Down Expand Up @@ -139,29 +142,47 @@ private SCMBlockLocationResponse handleError(SCMBlockLocationResponse resp)
/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
* @param size - size of the block.
* @param num - number of blocks.
* @param type - replication type of the blocks.
* @param factor - replication factor of the blocks.
* @param excludeList - exclude list while allocating blocks.
*
* @param size - size of the block.
* @param num - number of blocks.
* @param replicationConfig - replication configuration of the blocks.
* @param excludeList - exclude list while allocating blocks.
* @return allocated block accessing info (key, pipeline).
* @throws IOException
*/
@Override
public List<AllocatedBlock> allocateBlock(long size, int num,
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
String owner, ExcludeList excludeList) throws IOException {
public List<AllocatedBlock> allocateBlock(
long size, int num,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList
) throws IOException {
Preconditions.checkArgument(size > 0, "block size must be greater than 0");

AllocateScmBlockRequestProto request =
final AllocateScmBlockRequestProto.Builder requestBuilder =
AllocateScmBlockRequestProto.newBuilder()
.setSize(size)
.setNumBlocks(num)
.setType(type)
.setFactor(factor)
.setType(replicationConfig.getReplicationType())
.setOwner(owner)
.setExcludeList(excludeList.getProtoBuf())
.build();
.setExcludeList(excludeList.getProtoBuf());

switch (replicationConfig.getReplicationType()) {
case STAND_ALONE:
requestBuilder.setFactor(
((StandaloneReplicationConfig) replicationConfig)
.getReplicationFactor());
break;
case RATIS:
requestBuilder.setFactor(
((RatisReplicationConfig) replicationConfig).getReplicationFactor());
break;
default:
throw new IllegalArgumentException(
"Unsupported replication type " + replicationConfig
.getReplicationType());
}

AllocateScmBlockRequestProto request = requestBuilder.build();

SCMBlockLocationRequest wrapper = createSCMBlockRequest(
Type.AllocateScmBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ private ScmContainerLocationResponse submitRpcRequest(
*
* @param type - Replication Type
* @param factor - Replication Count
* @param owner - Service owner of the container.
*/
@Override
public ContainerWithPipeline allocateContainer(
Expand Down
Loading