Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@ public Pipeline create(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
return providers.get(type).create(factor, nodes);
}

@VisibleForTesting
public PipelineProvider getProvider(ReplicationType type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to expose this api for now. In the test where it is used, we can call pipelineManager.createPipeline instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return providers.get(type);
}

public void shutdown() {
providers.values().forEach(provider -> provider.shutdown());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
void startPipelineCreator();

void triggerPipelineCreation();

PipelineFactory getPipelineFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to expose this api as well. The shutdown for pipelineFactory should be called from PipelineManager itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ public interface PipelineProvider {

Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);

void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,75 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;

/**
* Implements Api for creating ratis pipelines.
*/
public class RatisPipelineProvider implements PipelineProvider {

private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineProvider.class);

private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;

// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelisimForPool = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo. parallelis'i'mForPool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private final ForkJoinPool.ForkJoinWorkerThreadFactory factory =
(pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.
defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("ratisCreatePipeline" + worker.getPoolIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT - "ratisCreatePipeline" - Can we make it all capital?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return worker;
});

private final ForkJoinPool forkJoinPool = new ForkJoinPool(
parallelisimForPool, factory, null, false);


RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
this.conf = conf;
}


/**
* Create pluggable container placement policy implementation instance.
*
Expand Down Expand Up @@ -133,7 +173,86 @@ public Pipeline create(ReplicationFactor factor,
.build();
}


@Override
public void shutdown() {
forkJoinPool.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also wait for the tasks to finish. We need to use awaitTermination call. We can use timeout of 60 seconds? That is what is used in Scheduler class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done based on arpit's comment, as on an unclean shutdown this terminate abruptly. So we can use shutdownNow(), instead of awaitTermination in normal case too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharatviswa504 I agree. We need to use shutdownNow but we also need to use awaitTermination. shutdownNow would interrupt the running tasks but the running task should handle the interrupt. If the task does not exit on interrupt, it is a better idea to wait for the task to finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

protected void initializePipeline(Pipeline pipeline) throws IOException {
RatisPipelineUtils.createPipeline(pipeline, conf);
createPipeline(pipeline);
}

/**
* Sends ratis command to create pipeline on all the datanodes.
*
* @param pipeline - Pipeline to be created
* @throws IOException if creation fails
*/
public void createPipeline(Pipeline pipeline)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this fn and replace it with initializePipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getNodes(),
(raftClient, peer) -> {
RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
if (reply == null || !reply.isSuccess()) {
String msg = "Pipeline initialization failed for pipeline:"
+ pipeline.getId() + " node:" + peer.getId();
LOG.error(msg);
throw new IOException(msg);
}
});
}

private void callRatisRpc(List<DatanodeDetails> datanodes,
CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
throws IOException {
if (datanodes.isEmpty()) {
return;
}

final String rpcType = conf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
final List< IOException > exceptions =
Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(conf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(conf));
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(conf);
try {
forkJoinPool.submit(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please verify that none of the threads are waiting for the parallel stream call to finish?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharatviswa504 Can you please verify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lokeshj1703 Sorry missed this comment earlier.
Checked this, one of the forkJoinPool thread is used for waiting and the same is being used in one of the calls for Ratis with 3 pipeline.

Output:
The below line is from after Submit.
Thread name RATISCREATEPIPELINE1
forkJoinPool.submit(() -> {
These below log lines are inside ParallelStream
datanodes.parallelStream().forEach(d -> {
Internal thread name RATISCREATEPIPELINE1
Internal thread name RATISCREATEPIPELINE3
Internal thread name RATISCREATEPIPELINE2

So, I think we should be fine with parallelism set to 3. I even tried with 4, but I still see the same above output.

datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig,
requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
"Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
LOG.error(errMsg, ioe);
exceptions.add(new IOException(errMsg, ioe));
}
});
}).get();
} catch (ExecutionException | RejectedExecutionException ex) {
LOG.error(ex.getClass().getName() + " exception occurred during " +
"createPipeline", ex);
throw new IOException(ex.getClass().getName() + " exception occurred " +
"during createPipeline", ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Interrupt exception occurred during " +
"createPipeline", ex);
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,37 @@
*/
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Utility class for Ratis pipelines. Contains methods to create and destroy
* ratis pipelines.
*/
final class RatisPipelineUtils {
public final class RatisPipelineUtils {

private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineUtils.class);

private RatisPipelineUtils() {
}

/**
* Sends ratis command to create pipeline on all the datanodes.
*
* @param pipeline - Pipeline to be created
* @param ozoneConf - Ozone Confinuration
* @throws IOException if creation fails
*/
public static void createPipeline(Pipeline pipeline, Configuration ozoneConf)
throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
callRatisRpc(pipeline.getNodes(), ozoneConf,
(raftClient, peer) -> {
RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
if (reply == null || !reply.isSuccess()) {
String msg = "Pipeline initialization failed for pipeline:"
+ pipeline.getId() + " node:" + peer.getId();
LOG.error(msg);
throw new IOException(msg);
}
});
}

/**
* Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
* the datanodes.
Expand Down Expand Up @@ -125,42 +96,4 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
client
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
}

private static void callRatisRpc(List<DatanodeDetails> datanodes,
Configuration ozoneConf,
CheckedBiConsumer<RaftClient, RaftPeer, IOException> rpc)
throws IOException {
if (datanodes.isEmpty()) {
return;
}

final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final List<IOException> exceptions =
Collections.synchronizedList(new ArrayList<>());
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
final TimeDuration requestTimeout =
RatisHelper.getClientRequestTimeout(ozoneConf);
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
"Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
LOG.error(errMsg, ioe);
exceptions.add(new IOException(errMsg, ioe));
}
});
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
this.lock = new ReentrantReadWriteLock();
this.conf = conf;
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager, conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
conf);
// TODO: See if thread priority needs to be set for these threads
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
Expand Down Expand Up @@ -346,6 +347,11 @@ public void triggerPipelineCreation() {
backgroundPipelineCreator.triggerPipelineCreation();
}

@Override
public PipelineFactory getPipelineFactory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need to expose this api. We can add a shutdown call for the pipelineFactory in the SCMPipelineManager#close fn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return pipelineFactory;
}

/**
* Moves the pipeline to CLOSED state and sends close container command for
* all the containers in the pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public Pipeline create(ReplicationFactor factor,
.setNodes(nodes)
.build();
}

@Override
public void shutdown() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT - Can we add a comment like //do nothing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,9 @@ public void stop() {
} catch (Exception ex) {
LOG.error("SCM Metadata store stop failed", ex);
}

// shutdown pipeline provider.
pipelineManager.getPipelineFactory().shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public MockRatisPipelineProvider(NodeManager nodeManager,
protected void initializePipeline(Pipeline pipeline) throws IOException {
// do nothing as the datanodes do not exists
}

@Override
public void shutdown() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT - Can we add a comment like // do nothing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* Tests for RatisPipelineUtils.
*/
public class TestRatisPipelineUtils {
public class TestRatisPipelineCreateAndDestory {

private static MiniOzoneCluster cluster;
private OzoneConfiguration conf = new OzoneConfiguration();
Expand Down Expand Up @@ -97,8 +97,11 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
}

// try creating another pipeline now
RatisPipelineProvider ratisPipelineProvider = (RatisPipelineProvider)
pipelineManager.getPipelineFactory().getProvider(
HddsProtos.ReplicationType.RATIS);
try {
RatisPipelineUtils.createPipeline(pipelines.get(0), conf);
ratisPipelineProvider.createPipeline(pipelines.get(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use pipelineManager#create call instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Assert.fail("pipeline creation should fail after shutting down pipeline");
} catch (IOException ioe) {
// in case the pipeline creation fails, MultipleIOException is thrown
Expand Down