diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 979451194de6..30ecc52cbf37 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -179,10 +179,13 @@ ContainerDataProto readContainer(long containerID) * @return ContainerInfo * @throws IOException - in case of error. */ + @Deprecated ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor replicationFactor, String owner) throws IOException; + ContainerWithPipeline createContainer(ReplicationConfig replicationConfig, String owner) throws IOException; + /** * Gets the list of underReplicated and unClosed containers on a decommissioning node. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 73ef288ad4c5..ace1d4751821 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -81,6 +81,8 @@ ContainerWithPipeline allocateContainer( HddsProtos.ReplicationFactor factor, String owner) throws IOException; + ContainerWithPipeline allocateContainer(ReplicationConfig replicationConfig, String owner) throws IOException; + /** * Ask SCM the location of the container. SCM responds with a group of * nodes where this container and its replicas are located. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 0f38716afbb9..463b9f54a609 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; @@ -222,13 +223,28 @@ private ScmContainerLocationResponse submitRpcRequest( public ContainerWithPipeline allocateContainer( HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, String owner) throws IOException { + ReplicationConfig replicationConfig = + ReplicationConfig.fromProtoTypeAndFactor(type, factor); + return allocateContainer(replicationConfig, owner); + } - ContainerRequestProto request = ContainerRequestProto.newBuilder() - .setTraceID(TracingUtil.exportCurrentSpan()) - .setReplicationFactor(factor) - .setReplicationType(type) - .setOwner(owner) - .build(); + @Override + public ContainerWithPipeline allocateContainer( + ReplicationConfig replicationConfig, String owner) throws IOException { + + ContainerRequestProto.Builder request = ContainerRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setReplicationType(replicationConfig.getReplicationType()) + .setOwner(owner); + + if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) { + HddsProtos.ECReplicationConfig ecProto = + ((ECReplicationConfig) replicationConfig).toProto(); + request.setEcReplicationConfig(ecProto); + request.setReplicationFactor(ReplicationFactor.ONE); // Set for backward compatibility, ignored for EC. + } else { + request.setReplicationFactor(ReplicationFactor.valueOf(replicationConfig.getReplication())); + } ContainerResponseProto response = submitRequest(Type.AllocateContainer, @@ -239,8 +255,7 @@ public ContainerWithPipeline allocateContainer( throw new IOException(response.hasErrorMessage() ? response.getErrorMessage() : "Allocate container failed."); } - return ContainerWithPipeline.fromProtobuf( - response.getContainerWithPipeline()); + return ContainerWithPipeline.fromProtobuf(response.getContainerWithPipeline()); } @Override diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index b487c7c7cecb..3818429a2fe1 100644 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@ -203,10 +203,11 @@ enum Type { */ message ContainerRequestProto { // Ozone only support replication of either 1 or 3. - required ReplicationFactor replicationFactor = 2; + optional ReplicationFactor replicationFactor = 2; required ReplicationType replicationType = 3; required string owner = 4; optional string traceID = 5; + optional ECReplicationConfig ecReplicationConfig = 6; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 2e6bfa6f3dd3..5622a6fac189 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -194,6 +194,9 @@ public ContainerInfo allocateContainer( if (pipelines.isEmpty()) { try { pipeline = pipelineManager.createPipeline(replicationConfig); + if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) { + pipelineManager.openPipeline(pipeline.getId()); + } pipelineManager.waitPipelineReady(pipeline.getId(), 0); } catch (IOException e) { scmContainerManagerMetrics.incNumFailureCreateContainers(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java index 4b9961184257..44042458fc35 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -753,9 +753,11 @@ public GetContainerReplicasResponseProto getContainerReplicas( public ContainerResponseProto allocateContainer(ContainerRequestProto request, int clientVersion) throws IOException { - ContainerWithPipeline cp = impl - .allocateContainer(request.getReplicationType(), - request.getReplicationFactor(), request.getOwner()); + ReplicationConfig replicationConfig = ReplicationConfig.fromProto(request.getReplicationType(), + request.getReplicationFactor(), + request.getEcReplicationConfig() + ); + ContainerWithPipeline cp = impl.allocateContainer(replicationConfig, request.getOwner()); return ContainerResponseProto.newBuilder() .setContainerWithPipeline(cp.getProtobuf(clientVersion)) .setErrorCode(ContainerResponseProto.Error.success) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index b9b5a1920b37..ac7574f088bf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -230,10 +230,16 @@ public void join() throws InterruptedException { public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType replicationType, HddsProtos.ReplicationFactor factor, String owner) throws IOException { + ReplicationConfig replicationConfig = + ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor); + return allocateContainer(replicationConfig, owner); + } + @Override + public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConfig, String owner) throws IOException { Map auditMap = Maps.newHashMap(); - auditMap.put("replicationType", String.valueOf(replicationType)); - auditMap.put("factor", String.valueOf(factor)); + auditMap.put("replicationType", String.valueOf(replicationConfig.getReplicationType())); + auditMap.put("replication", String.valueOf(replicationConfig.getReplication())); auditMap.put("owner", String.valueOf(owner)); try { @@ -243,9 +249,7 @@ public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType } getScm().checkAdminAccess(getRemoteUser(), false); final ContainerInfo container = scm.getContainerManager() - .allocateContainer( - ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor), - owner); + .allocateContainer(replicationConfig, owner); final Pipeline pipeline = scm.getPipelineManager() .getPipeline(container.getPipelineID()); ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline); diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java index 5479a8c4f895..464d3bec24a0 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java @@ -201,13 +201,18 @@ public String getEncodedContainerToken(long containerId) throws IOException { @Override public ContainerWithPipeline createContainer(HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, String owner) throws IOException { + ReplicationConfig replicationConfig = + ReplicationConfig.fromProtoTypeAndFactor(replicationType, factor); + return createContainer(replicationConfig, owner); + } + + @Override + public ContainerWithPipeline createContainer(ReplicationConfig replicationConfig, String owner) throws IOException { XceiverClientSpi client = null; XceiverClientManager clientManager = getXceiverClientManager(); try { - // allocate container on SCM. - ContainerWithPipeline containerWithPipeline = - storageContainerLocationClient.allocateContainer(type, factor, - owner); + ContainerWithPipeline containerWithPipeline = + storageContainerLocationClient.allocateContainer(replicationConfig, owner); Pipeline pipeline = containerWithPipeline.getPipeline(); // connect to pipeline leader and allocate container on leader datanode. client = clientManager.acquireClient(pipeline); diff --git a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CreateSubcommand.java b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CreateSubcommand.java index 34397b5f5128..96038827616c 100644 --- a/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CreateSubcommand.java +++ b/hadoop-ozone/cli-admin/src/main/java/org/apache/hadoop/hdds/scm/cli/container/CreateSubcommand.java @@ -19,9 +19,14 @@ import java.io.IOException; import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.cli.ScmSubcommand; import org.apache.hadoop.hdds.scm.client.ScmClient; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.ozone.shell.ShellReplicationOptions; +import picocli.CommandLine; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -30,7 +35,8 @@ */ @Command( name = "create", - description = "Create container", + description = "Create container. If no replication config provided, " + + "defaults to STAND_ALONE with replication factor ONE.", mixinStandardHelpOptions = true, versionProvider = HddsVersionProvider.class) public class CreateSubcommand extends ScmSubcommand { @@ -39,10 +45,19 @@ public class CreateSubcommand extends ScmSubcommand { names = { "-o", "--owner"}) private String owner; + @CommandLine.Mixin + private ShellReplicationOptions containerReplicationOptions; + @Override public void execute(ScmClient scmClient) throws IOException { - ContainerWithPipeline container = scmClient.createContainer(owner); - System.out.printf("Container %s is created.%n", - container.getContainerInfo().getContainerID()); + ReplicationConfig replicationConfig = containerReplicationOptions.fromParamsOrConfig(new OzoneConfiguration()); + if (replicationConfig == null) { + // if replication options not provided via command then by default STAND_ALONE container will be created. + replicationConfig = ReplicationConfig.fromProtoTypeAndFactor(HddsProtos.ReplicationType.STAND_ALONE, + HddsProtos.ReplicationFactor.ONE); + } + ContainerWithPipeline container = scmClient.createContainer(replicationConfig, owner); + System.out.printf("Container %s is created with replication config %s.%n", + container.getContainerInfo().getContainerID(), replicationConfig); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestAllocateContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestAllocateContainer.java index 60e329790fa8..3ceca40041b1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestAllocateContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestAllocateContainer.java @@ -20,7 +20,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.utils.IOUtils; @@ -73,4 +77,23 @@ public void testAllocateNull() { SCMTestUtils.getReplicationType(conf), SCMTestUtils.getReplicationFactor(conf), null)); } + + @Test + public void testAllocateRatis() throws Exception { + testAllocateContainer(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)); + } + + @Test + public void testAllocateEC() throws Exception { + ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2); + testAllocateContainer(ecReplicationConfig); + } + + private void testAllocateContainer(ReplicationConfig replicationConfig) throws Exception { + ContainerWithPipeline container = + storageContainerLocationClient.allocateContainer(replicationConfig, OzoneConsts.OZONE); + + assertNotNull(container); + assertNotNull(container.getPipeline().getFirstNode()); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerOperations.java index 492cf1e56385..2bc127a8f3d3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerOperations.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerOperations.java @@ -34,6 +34,9 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -265,4 +268,21 @@ public void testNodeOperationalStates() throws Exception { nm.setNodeOperationalState(node, originalState); } } + + @Test + public void testCreateRatis() throws Exception { + testCreateContainer(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)); + } + + @Test + public void testCreateEC() throws Exception { + ECReplicationConfig ecConfig = new ECReplicationConfig(3, 2); + testCreateContainer(ecConfig); + } + + private void testCreateContainer(ReplicationConfig replicationConfig) throws Exception { + ContainerWithPipeline container = storageClient.createContainer(replicationConfig, OzoneConsts.OZONE); + assertEquals(container.getContainerInfo().getContainerID(), + storageClient.getContainer(container.getContainerInfo().getContainerID()).getContainerID()); + } }