Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,27 @@ public class ScmConfig {
"The full name of class which implements "
+ "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ "The class decides which pipeline will be used to find or "
+ "allocate container. If not set, "
+ "allocate Ratis containers. If not set, "
+ "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
+ "RandomPipelineChoosePolicy will be used as default value."
)
private String pipelineChoosePolicyName;

@Config(key = "ec.pipeline.choose.policy.impl",
type = ConfigType.STRING,
defaultValue = "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms" +
".RandomPipelineChoosePolicy",
tags = { ConfigTag.SCM, ConfigTag.PIPELINE },
description =
"The full name of class which implements "
+ "org.apache.hadoop.hdds.scm.PipelineChoosePolicy. "
+ "The class decides which pipeline will be used when "
+ "selecting an EC Pipeline. If not set, "
+ "org.apache.hadoop.hdds.scm.pipeline.choose.algorithms. "
+ "RandomPipelineChoosePolicy will be used as default value."
)
private String ecPipelineChoosePolicyName;

@Config(key = "block.deletion.per-interval.max",
type = ConfigType.INT,
defaultValue = "100000",
Expand Down Expand Up @@ -138,6 +153,10 @@ public void setPipelineChoosePolicyName(String pipelineChoosePolicyName) {
this.pipelineChoosePolicyName = pipelineChoosePolicyName;
}

public void setECPipelineChoosePolicyName(String policyName) {
this.ecPipelineChoosePolicyName = policyName;
}

public void setBlockDeletionLimit(int blockDeletionLimit) {
this.blockDeletionLimit = blockDeletionLimit;
}
Expand All @@ -158,6 +177,10 @@ public String getPipelineChoosePolicyName() {
return pipelineChoosePolicyName;
}

public String getECPipelineChoosePolicyName() {
return ecPipelineChoosePolicyName;
}

public int getBlockDeletionLimit() {
return blockDeletionLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public WritableContainerFactory(StorageContainerManager scm) {
scm.getScmNodeManager(),
scm.getPipelineManager(),
scm.getContainerManager(),
scm.getPipelineChoosePolicy());
scm.getEcPipelineChoosePolicy());
}

public ContainerInfo getContainer(final long size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
Expand All @@ -41,25 +40,31 @@ public final class PipelineChoosePolicyFactory {
OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT =
RandomPipelineChoosePolicy.class;

@VisibleForTesting
public static final Class<? extends PipelineChoosePolicy>
OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT =
RandomPipelineChoosePolicy.class;

private PipelineChoosePolicyFactory() {
}

public static PipelineChoosePolicy getPolicy(
ConfigurationSource conf) throws SCMException {
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
Class<? extends PipelineChoosePolicy> policyClass = getClass(
scmConfig.getPipelineChoosePolicyName(), PipelineChoosePolicy.class);

ScmConfig scmConfig, boolean forEC) throws SCMException {
Class<? extends PipelineChoosePolicy> policyClass = null;
String policyName = forEC ? scmConfig.getECPipelineChoosePolicyName() :
scmConfig.getPipelineChoosePolicyName();
try {
policyClass = getClass(policyName, PipelineChoosePolicy.class);
return createPipelineChoosePolicyFromClass(policyClass);
} catch (Exception e) {
if (policyClass != OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT) {
Class<? extends PipelineChoosePolicy> defaultPolicy = forEC ?
OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT :
OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT;
if (policyClass == null || policyClass != defaultPolicy) {
LOG.error("Met an exception while create pipeline choose policy "
+ "for the given class " + policyClass.getName()
+ ". Fallback to the default pipeline choose policy "
+ OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT, e);
return createPipelineChoosePolicyFromClass(
OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT);
+ "for the given class {}. Fallback to the default pipeline "
+ " choose policy {}", policyName, defaultPolicy, e);
return createPipelineChoosePolicyFromClass(defaultPolicy);
}
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
*/
private NetworkTopology clusterMap;
private PipelineChoosePolicy pipelineChoosePolicy;
private PipelineChoosePolicy ecPipelineChoosePolicy;
private SecurityConfig securityConfig;

private final SCMHANodeDetails scmHANodeDetails;
Expand Down Expand Up @@ -764,7 +765,11 @@ private void initializeSystemManagers(OzoneConfiguration conf,
containerReplicaPendingOps);
}

pipelineChoosePolicy = PipelineChoosePolicyFactory.getPolicy(conf);
ScmConfig scmConfig = conf.getObject(ScmConfig.class);
pipelineChoosePolicy = PipelineChoosePolicyFactory
.getPolicy(scmConfig, false);
ecPipelineChoosePolicy = PipelineChoosePolicyFactory
.getPolicy(scmConfig, true);
if (configurator.getWritableContainerFactory() != null) {
writableContainerFactory = configurator.getWritableContainerFactory();
} else {
Expand Down Expand Up @@ -2010,6 +2015,10 @@ public PipelineChoosePolicy getPipelineChoosePolicy() {
return this.pipelineChoosePolicy;
}

public PipelineChoosePolicy getEcPipelineChoosePolicy() {
return this.ecPipelineChoosePolicy;
}

@Override
public String getScmId() {
return getScmStorageConfig().getScmId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.junit.jupiter.api.Assertions;
Expand All @@ -31,6 +30,7 @@
import java.io.IOException;
import java.util.List;

import static org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolicyFactory.OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT;
import static org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolicyFactory.OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT;

/**
Expand All @@ -52,14 +52,31 @@ public void setup() {
@Test
public void testDefaultPolicy() throws IOException {
PipelineChoosePolicy policy = PipelineChoosePolicyFactory
.getPolicy(conf);
.getPolicy(scmConfig, false);
Assertions.assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}

@Test
public void testDefaultPolicyEC() throws IOException {
PipelineChoosePolicy policy = PipelineChoosePolicyFactory
.getPolicy(scmConfig, true);
Assertions.assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}

@Test
public void testNonDefaultPolicyEC() throws IOException {
scmConfig.setECPipelineChoosePolicyName(DummyGoodImpl.class.getName());
PipelineChoosePolicy policy = PipelineChoosePolicyFactory
.getPolicy(scmConfig, true);
Assertions.assertSame(DummyGoodImpl.class, policy.getClass());
}


/**
* A dummy pipeline choose policy implementation for test.
* A dummy pipeline choose policy implementation for test with an invalid
* constructor.
*/
public static class DummyImpl implements PipelineChoosePolicy {

Expand All @@ -79,22 +96,52 @@ public int choosePipelineIndex(List<Pipeline> pipelineList,
}
}

/**
* A dummy pipeline choose policy implementation for test that is valid.
*/
public static class DummyGoodImpl implements PipelineChoosePolicy {

@Override
public Pipeline choosePipeline(List<Pipeline> pipelineList,
PipelineRequestInformation pri) {
return null;
}

@Override
public int choosePipelineIndex(List<Pipeline> pipelineList,
PipelineRequestInformation pri) {
return -1;
}
}


@Test
public void testConstuctorNotFound() throws SCMException {
public void testConstructorNotFound() throws SCMException {
// set a policy class which does't have the right constructor implemented
scmConfig.setPipelineChoosePolicyName(DummyImpl.class.getName());
PipelineChoosePolicy policy = PipelineChoosePolicyFactory.getPolicy(conf);
scmConfig.setECPipelineChoosePolicyName(DummyImpl.class.getName());
PipelineChoosePolicy policy =
PipelineChoosePolicyFactory.getPolicy(scmConfig, false);
Assertions.assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true);
Assertions.assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}

@Test
public void testClassNotImplemented() throws SCMException {
// set a placement class not implemented
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
scmConfig.setPipelineChoosePolicyName(
"org.apache.hadoop.hdds.scm.pipeline.choose.policy.HelloWorld");
scmConfig.setECPipelineChoosePolicyName(
"org.apache.hadoop.hdds.scm.pipeline.choose.policy.HelloWorld");
PipelineChoosePolicy policy = PipelineChoosePolicyFactory.getPolicy(conf);
PipelineChoosePolicy policy =
PipelineChoosePolicyFactory.getPolicy(scmConfig, false);
Assertions.assertSame(OZONE_SCM_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
policy = PipelineChoosePolicyFactory.getPolicy(scmConfig, true);
Assertions.assertSame(OZONE_SCM_EC_PIPELINE_CHOOSE_POLICY_IMPL_DEFAULT,
policy.getClass());
}
}