Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ ContainerDataProto readContainer(long containerID)
* @return ContainerInfo
* @throws IOException - in case of error.
*/
@Deprecated
ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor replicationFactor,
String owner) throws IOException;

ContainerWithPipeline createContainer(ReplicationConfig replicationConfig, String owner) throws IOException;

/**
* Gets the list of underReplicated and unClosed containers on a decommissioning node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ ContainerWithPipeline allocateContainer(
HddsProtos.ReplicationFactor factor, String owner)
throws IOException;

ContainerWithPipeline allocateContainer(ReplicationConfig replicationConfig, String owner) throws IOException;

/**
* Ask SCM the location of the container. SCM responds with a group of
* nodes where this container and its replicas are located.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
Expand Down Expand Up @@ -222,13 +223,28 @@ private ScmContainerLocationResponse submitRpcRequest(
public ContainerWithPipeline allocateContainer(
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
ReplicationConfig replicationConfig =
ReplicationConfig.fromProtoTypeAndFactor(type, factor);
return allocateContainer(replicationConfig, owner);
}

ContainerRequestProto request = ContainerRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setReplicationFactor(factor)
.setReplicationType(type)
.setOwner(owner)
.build();
@Override
public ContainerWithPipeline allocateContainer(
ReplicationConfig replicationConfig, String owner) throws IOException {

ContainerRequestProto.Builder request = ContainerRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setReplicationType(replicationConfig.getReplicationType())
.setOwner(owner);

if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) {
HddsProtos.ECReplicationConfig ecProto =
((ECReplicationConfig) replicationConfig).toProto();
request.setEcReplicationConfig(ecProto);
request.setReplicationFactor(ReplicationFactor.ONE); // Set for backward compatibility, ignored for EC.
} else {
request.setReplicationFactor(ReplicationFactor.valueOf(replicationConfig.getReplication()));
}

ContainerResponseProto response =
submitRequest(Type.AllocateContainer,
Expand All @@ -239,8 +255,7 @@ public ContainerWithPipeline allocateContainer(
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate container failed.");
}
return ContainerWithPipeline.fromProtobuf(
response.getContainerWithPipeline());
return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ enum Type {
*/
message ContainerRequestProto {
// Ozone only support replication of either 1 or 3.
required ReplicationFactor replicationFactor = 2;
optional ReplicationFactor replicationFactor = 2;
required ReplicationType replicationType = 3;
required string owner = 4;
optional string traceID = 5;
optional ECReplicationConfig ecReplicationConfig = 6;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ public ContainerInfo allocateContainer(
if (pipelines.isEmpty()) {
try {
pipeline = pipelineManager.createPipeline(replicationConfig);
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) {
pipelineManager.openPipeline(pipeline.getId());
}
pipelineManager.waitPipelineReady(pipeline.getId(), 0);
} catch (IOException e) {
scmContainerManagerMetrics.incNumFailureCreateContainers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,9 +753,11 @@ public GetContainerReplicasResponseProto getContainerReplicas(

public ContainerResponseProto allocateContainer(ContainerRequestProto request,
int clientVersion) throws IOException {
ContainerWithPipeline cp = impl
.allocateContainer(request.getReplicationType(),
request.getReplicationFactor(), request.getOwner());
ReplicationConfig replicationConfig = ReplicationConfig.fromProto(request.getReplicationType(),
request.getReplicationFactor(),
request.getEcReplicationConfig()
);
ContainerWithPipeline cp = impl.allocateContainer(replicationConfig, request.getOwner());
return ContainerResponseProto.newBuilder()
.setContainerWithPipeline(cp.getProtobuf(clientVersion))
.setErrorCode(ContainerResponseProto.Error.success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,16 @@ public void join() throws InterruptedException {
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
ReplicationConfig replicationConfig =
ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor);
return allocateContainer(replicationConfig, owner);
}

@Override
public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConfig, String owner) throws IOException {
Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("replicationType", String.valueOf(replicationType));
auditMap.put("factor", String.valueOf(factor));
auditMap.put("replicationType", String.valueOf(replicationConfig.getReplicationType()));
auditMap.put("replication", String.valueOf(replicationConfig.getReplication()));
auditMap.put("owner", String.valueOf(owner));

try {
Expand All @@ -243,9 +249,7 @@ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
}
getScm().checkAdminAccess(getRemoteUser(), false);
final ContainerInfo container = scm.getContainerManager()
.allocateContainer(
ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor),
owner);
.allocateContainer(replicationConfig, owner);
final Pipeline pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,18 @@ public String getEncodedContainerToken(long containerId) throws IOException {
@Override
public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, String owner) throws IOException {
ReplicationConfig replicationConfig =
ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor);
return createContainer(replicationConfig, owner);
}

@Override
public ContainerWithPipeline createContainer(ReplicationConfig replicationConfig, String owner) throws IOException {
XceiverClientSpi client = null;
XceiverClientManager clientManager = getXceiverClientManager();
try {
// allocate container on SCM.
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(type, factor,
owner);
ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.allocateContainer(replicationConfig, owner);
Pipeline pipeline = containerWithPipeline.getPipeline();
// connect to pipeline leader and allocate container on leader datanode.
client = clientManager.acquireClient(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

import java.io.IOException;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.ozone.shell.ShellReplicationOptions;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

Expand All @@ -30,7 +35,8 @@
*/
@Command(
name = "create",
description = "Create container",
description = "Create container. If no replication config provided, " +
"defaults to STAND_ALONE with replication factor ONE.",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class CreateSubcommand extends ScmSubcommand {
Expand All @@ -39,10 +45,19 @@ public class CreateSubcommand extends ScmSubcommand {
names = { "-o", "--owner"})
private String owner;

@CommandLine.Mixin
private ShellReplicationOptions containerReplicationOptions;

@Override
public void execute(ScmClient scmClient) throws IOException {
ContainerWithPipeline container = scmClient.createContainer(owner);
System.out.printf("Container %s is created.%n",
container.getContainerInfo().getContainerID());
ReplicationConfig replicationConfig = containerReplicationOptions.fromParamsOrConfig(new OzoneConfiguration());
if (replicationConfig == null) {
// if replication options not provided via command then by default STAND_ALONE container will be created.
replicationConfig = ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.STAND_ALONE,
HddsProtos.ReplicationFactor.ONE);
}
ContainerWithPipeline container = scmClient.createContainer(replicationConfig, owner);
System.out.printf("Container %s is created with replication config %s.%n",
container.getContainerInfo().getContainerID(), replicationConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.utils.IOUtils;
Expand Down Expand Up @@ -73,4 +77,23 @@ public void testAllocateNull() {
SCMTestUtils.getReplicationType(conf),
SCMTestUtils.getReplicationFactor(conf), null));
}

@Test
public void testAllocateRatis() throws Exception {
testAllocateContainer(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
}

@Test
public void testAllocateEC() throws Exception {
ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
testAllocateContainer(ecReplicationConfig);
}

private void testAllocateContainer(ReplicationConfig replicationConfig) throws Exception {
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(replicationConfig, OzoneConsts.OZONE);

assertNotNull(container);
assertNotNull(container.getPipeline().getFirstNode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -265,4 +268,21 @@ public void testNodeOperationalStates() throws Exception {
nm.setNodeOperationalState(node, originalState);
}
}

@Test
public void testCreateRatis() throws Exception {
testCreateContainer(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
}

@Test
public void testCreateEC() throws Exception {
ECReplicationConfig ecConfig = new ECReplicationConfig(3, 2);
testCreateContainer(ecConfig);
}

private void testCreateContainer(ReplicationConfig replicationConfig) throws Exception {
ContainerWithPipeline container = storageClient.createContainer(replicationConfig, OzoneConsts.OZONE);
assertEquals(container.getContainerInfo().getContainerID(),
storageClient.getContainer(container.getContainerInfo().getContainerID()).getContainerID());
}
}
Loading