Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public final class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;

PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
Configuration conf) {
Configuration conf, RatisPipelineUtils ratisPipelineUtils) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharatviswa504 Thanks for adding the changes! An instance of RatisPipelineUtils does not seem right as it is a utility class. Could we move the functions of RatisPipelineUtils to RatisPipelineProvider instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager));
providers.put(ReplicationType.RATIS,
new RatisPipelineProvider(nodeManager, stateManager, conf));
new RatisPipelineProvider(nodeManager, stateManager, conf, ratisPipelineUtils));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
Expand Down Expand Up @@ -45,12 +46,21 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
private final RatisPipelineUtils ratisPipelineUtils;

RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf) {
PipelineStateManager stateManager, Configuration conf,
RatisPipelineUtils ratisPipelineUtils) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
this.ratisPipelineUtils = ratisPipelineUtils;
}

@VisibleForTesting
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf) {
this(nodeManager, stateManager, conf, null);
}

/**
Expand Down Expand Up @@ -134,6 +144,6 @@ public Pipeline create(ReplicationFactor factor,
}

protected void initializePipeline(Pipeline pipeline) throws IOException {
RatisPipelineUtils.createPipeline(pipeline, conf);
ratisPipelineUtils.createPipeline(pipeline, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,18 @@ public final class RatisPipelineUtils {
LoggerFactory.getLogger(RatisPipelineUtils.class);

// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private static final int PARALLELISIM_FOR_POOL = 3;
private final int parallelisimForPool = 3;

private static final ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY =
private final ForkJoinPool.ForkJoinWorkerThreadFactory factory =
(forkJoinPool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.
defaultForkJoinWorkerThreadFactory.newThread(forkJoinPool);
worker.setName("ratisCreatePipeline" + worker.getPoolIndex());
return worker;
});

public static final ForkJoinPool POOL = new ForkJoinPool(
PARALLELISIM_FOR_POOL, FACTORY, null, false);


private RatisPipelineUtils() {
}
public final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelisimForPool, factory, null, false);

/**
* Sends ratis command to create pipeline on all the datanodes.
Expand All @@ -80,7 +76,7 @@ private RatisPipelineUtils() {
* @param ozoneConf - Ozone Confinuration
* @throws IOException if creation fails
*/
public static void createPipeline(Pipeline pipeline, Configuration ozoneConf)
public void createPipeline(Pipeline pipeline, Configuration ozoneConf)
throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
Expand Down Expand Up @@ -145,7 +141,7 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
}

private static void callRatisRpc(List<DatanodeDetails> datanodes,
private void callRatisRpc(List<DatanodeDetails> datanodes,
Configuration ozoneConf,
CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
throws IOException {
Expand All @@ -166,7 +162,7 @@ private static void callRatisRpc(List<DatanodeDetails> datanodes,
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
try {
POOL.submit(() -> {
forkJoinPool.submit(() -> {
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
Expand Down Expand Up @@ -196,4 +192,8 @@ private static void callRatisRpc(List<DatanodeDetails> datanodes,
throw MultipleIOException.createIOException(exceptions);
}
}

public void shutdown() {
forkJoinPool.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -83,11 +84,12 @@ public class SCMPipelineManager implements PipelineManager {
private ObjectName pmInfoBean;

public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher) throws IOException {
EventPublisher eventPublisher, RatisPipelineUtils ratisPipelineUtils) throws IOException {
this.lock = new ReentrantReadWriteLock();
this.conf = conf;
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
conf, ratisPipelineUtils);
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
Expand All @@ -111,6 +113,12 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
initializePipelineState();
}

@VisibleForTesting
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher) throws IOException {
this(conf, nodeManager, eventPublisher, null);
}

public PipelineStateManager getStateManager() {
return stateManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final SafeModeHandler safeModeHandler;
private SCMContainerMetrics scmContainerMetrics;

private RatisPipelineUtils ratisPipelineUtils;

/**
* Creates a new StorageContainerManager. Configuration will be
* updated with information on the actual listening addresses used
Expand Down Expand Up @@ -399,8 +401,10 @@ private void initalizeSystemManagers(OzoneConfiguration conf,
if (configurator.getPipelineManager() != null) {
pipelineManager = configurator.getPipelineManager();
} else {
ratisPipelineUtils = new RatisPipelineUtils();
pipelineManager =
new SCMPipelineManager(conf, scmNodeManager, eventQueue);
new SCMPipelineManager(conf, scmNodeManager, eventQueue,
ratisPipelineUtils);
}

if (configurator.getContainerManager() != null) {
Expand Down Expand Up @@ -1020,7 +1024,7 @@ public void stop() {
}

// shutdown RatisPipelineUtils pool.
RatisPipelineUtils.POOL.shutdownNow();
ratisPipelineUtils.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,15 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
}

// try creating another pipeline now
RatisPipelineUtils ratisPipelineUtils = new RatisPipelineUtils();
try {
RatisPipelineUtils.createPipeline(pipelines.get(0), conf);
ratisPipelineUtils.createPipeline(pipelines.get(0), conf);
Assert.fail("pipeline creation should fail after shutting down pipeline");
} catch (IOException ioe) {
// in case the pipeline creation fails, MultipleIOException is thrown
Assert.assertTrue(ioe instanceof MultipleIOException);
} finally {
ratisPipelineUtils.shutdown();
}

// make sure pipelines is destroyed
Expand Down