Skip to content

Commit

Permalink
RATIS-2155. Add a builder for RatisShell. (#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored Sep 18, 2024
1 parent 94f3fef commit 6451f9b
Show file tree
Hide file tree
Showing 19 changed files with 283 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ private ManagedChannel buildChannel(String address, GrpcTlsConfig tlsConf,
channelBuilder.proxyDetector(uri -> null);

if (tlsConf != null) {
LOG.debug("Setting TLS for {}", address);
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
GrpcUtil.setTrustManager(sslContextBuilder, tlsConf.getTrustManager());
if (tlsConf.getMtlsEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ private NettyServerBuilder newNettyServerBuilder(String hostname, int port, Grpc
.flowControlWindow(flowControlWindow.getSizeInt());

if (tlsConfig != null) {
LOG.info("Setting TLS for {}", address);
SslContextBuilder sslContextBuilder = GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
if (tlsConfig.getMtlsEnabled()) {
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ default Factory<MiniRaftClusterWithGrpc> getFactory() {
public static final DelayLocalExecutionInjection SEND_SERVER_REQUEST_INJECTION =
new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);

protected MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties,
Parameters parameters) {
public MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties, Parameters parameters) {
this(ids, new String[0], properties, parameters);
}

public MiniRaftClusterWithGrpc(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) {
super(ids, listenerIds, properties, parameters);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,29 @@
package org.apache.ratis.shell.cli;

import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedFunction;

import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.UUID;

/**
* Utilities for command line interface.
*/
public final class CliUtils {
private static final ExponentialBackoffRetry RETRY_POLICY = ExponentialBackoffRetry.newBuilder()
.setBaseSleepTime(TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS))
.setMaxAttempts(10)
.setMaxSleepTime(TimeDuration.valueOf(100_000, TimeUnit.MILLISECONDS))
.build();

private CliUtils() {
// prevent instantiation
}
Expand All @@ -68,24 +55,6 @@ public static RaftPeerId getPeerId(String host, int port) {
return RaftPeerId.getRaftPeerId(host + "_" + port);
}

/** Create a new {@link RaftClient} from the given group. */
public static RaftClient newRaftClient(RaftGroup group) {
RaftProperties properties = new RaftProperties();
RaftClientConfigKeys.Rpc.setRequestTimeout(properties,
TimeDuration.valueOf(15, TimeUnit.SECONDS));

// Since ratis-shell support GENERIC_COMMAND_OPTIONS, here we should
// merge these options to raft properties to make it work.
final Properties sys = System.getProperties();
sys.stringPropertyNames().forEach(key -> properties.set(key, sys.getProperty(key)));

return RaftClient.newBuilder()
.setRaftGroup(group)
.setProperties(properties)
.setRetryPolicy(RETRY_POLICY)
.build();
}

/**
* Apply the given function to the given parameter a list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.ratis.shell.cli.sh;

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.shell.cli.AbstractShell;
import org.apache.ratis.shell.cli.Command;
import org.apache.ratis.shell.cli.sh.command.AbstractParentCommand;
Expand Down Expand Up @@ -60,7 +63,11 @@ public static void main(String[] args) {
}

public RatisShell(PrintStream out) {
super(new Context(out));
this(new Context(out));
}

private RatisShell(Context context) {
super(context);
}

@Override
Expand All @@ -73,4 +80,39 @@ protected Map<String, Command> loadCommands(Context context) {
return allParentCommands(context).stream()
.collect(Collectors.toMap(Command::getCommandName, Function.identity()));
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {
private PrintStream printStream = System.out;
private RetryPolicy retryPolicy;
private RaftProperties properties;
private Parameters parameters;

public Builder setPrintStream(PrintStream printStream) {
this.printStream = printStream;
return this;
}

public Builder setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}

public Builder setProperties(RaftProperties properties) {
this.properties = properties;
return this;
}

public Builder setParameters(Parameters parameters) {
this.parameters = parameters;
return this;
}

public RatisShell build() {
return new RatisShell(new Context(printStream, false, retryPolicy, properties, parameters));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,25 @@
*/
public abstract class AbstractCommand implements Command {

private final PrintStream printStream;
private final Context context;

protected AbstractCommand(Context context) {
printStream = context.getPrintStream();
this.context = context;
}

protected Context getContext() {
return context;
}

protected PrintStream getPrintStream() {
return printStream;
return getContext().getPrintStream();
}

protected void printf(String format, Object... args) {
printStream.printf(format, args);
getPrintStream().printf(format, args);
}

protected void println(Object message) {
printStream.println(message);
getPrintStream().println(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,18 @@ public int run(CommandLine cl) throws IOException {
final RaftGroupId groupIdSpecified = CliUtils.parseRaftGroupId(cl.getOptionValue(GROUPID_OPTION_NAME));
raftGroup = RaftGroup.valueOf(groupIdSpecified != null? groupIdSpecified: RaftGroupId.randomId(), peers);
PrintStream printStream = getPrintStream();
try (final RaftClient client = CliUtils.newRaftClient(raftGroup)) {
try (final RaftClient client = newRaftClient()) {
final RaftGroupId remoteGroupId = CliUtils.getGroupId(client, peers, groupIdSpecified, printStream);
groupInfoReply = CliUtils.getGroupInfo(client, peers, remoteGroupId, printStream);
raftGroup = groupInfoReply.getGroup();
}
return 0;
}

protected RaftClient newRaftClient() {
return getContext().newRaftClient(getRaftGroup());
}

@Override
public Options getOptions() {
return new Options()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,59 @@
*/
package org.apache.ratis.shell.cli.sh.command;

import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.retry.ExponentialBackoffRetry;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.common.io.Closer;
import org.apache.ratis.util.TimeDuration;

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
* A context for ratis-shell.
*/
public final class Context implements Closeable {
private static final TimeDuration DEFAULT_REQUEST_TIMEOUT = TimeDuration.valueOf(15, TimeUnit.SECONDS);
private static final RetryPolicy DEFAULT_RETRY_POLICY = ExponentialBackoffRetry.newBuilder()
.setBaseSleepTime(TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS))
.setMaxAttempts(10)
.setMaxSleepTime(TimeDuration.valueOf(100_000, TimeUnit.MILLISECONDS))
.build();

private final PrintStream mPrintStream;
private final Closer mCloser;

private final boolean cli;
private final RetryPolicy retryPolicy;
private final RaftProperties properties;
private final Parameters parameters;

/**
* Build a context.
* @param printStream the print stream
*/
public Context(PrintStream printStream) {
this(printStream, true, DEFAULT_RETRY_POLICY, new RaftProperties(), null);
}

public Context(PrintStream printStream, boolean cli, RetryPolicy retryPolicy,
RaftProperties properties, Parameters parameters) {
mCloser = Closer.create();
mPrintStream = mCloser.register(Objects.requireNonNull(printStream, "printStream == null"));

this.cli = cli;
this.retryPolicy = retryPolicy != null? retryPolicy : DEFAULT_RETRY_POLICY;
this.properties = properties != null? properties : new RaftProperties();
this.parameters = parameters;
}

/**
Expand All @@ -47,6 +79,43 @@ public PrintStream getPrintStream() {
return mPrintStream;
}

/** Is this from CLI? */
public boolean isCli() {
return cli;
}

public RetryPolicy getRetryPolicy() {
return retryPolicy;
}

public RaftProperties getProperties() {
return properties;
}

public Parameters getParameters() {
return parameters;
}

/** Create a new {@link RaftClient} from the given group. */
public RaftClient newRaftClient(RaftGroup group) {
final RaftProperties p = getProperties();
if (isCli()) {
RaftClientConfigKeys.Rpc.setRequestTimeout(p, DEFAULT_REQUEST_TIMEOUT);

// Since ratis-shell support GENERIC_COMMAND_OPTIONS, here we should
// merge these options to raft p to make it work.
final Properties sys = System.getProperties();
sys.stringPropertyNames().forEach(key -> p.set(key, sys.getProperty(key)));
}

return RaftClient.newBuilder()
.setRaftGroup(group)
.setProperties(p)
.setParameters(getParameters())
.setRetryPolicy(getRetryPolicy())
.build();
}

@Override
public void close() throws IOException {
mCloser.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.shell.cli.CliUtils;
import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;

Expand Down Expand Up @@ -61,7 +60,7 @@ public int run(CommandLine cl) throws IOException {
printf("Peer not found: %s", strAddr);
return -1;
}
try(final RaftClient raftClient = CliUtils.newRaftClient(getRaftGroup())) {
try(final RaftClient raftClient = newRaftClient()) {
RaftClientReply reply = raftClient.getLeaderElectionManagementApi(peerId).pause();
processReply(reply, () -> String.format("Failed to pause leader election on peer %s", strAddr));
printf(String.format("Successful pause leader election on peer %s", strAddr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.shell.cli.CliUtils;
import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;

Expand Down Expand Up @@ -61,7 +60,7 @@ public int run(CommandLine cl) throws IOException {
printf("Can't find a sever with the address:%s", strAddr);
return -1;
}
try(final RaftClient raftClient = CliUtils.newRaftClient(getRaftGroup())) {
try(final RaftClient raftClient = newRaftClient()) {
RaftClientReply reply = raftClient.getLeaderElectionManagementApi(peerId).resume();
processReply(reply, () -> String.format("Failed to resume leader election on peer %s", strAddr));
printf(String.format("Successful pause leader election on peer %s", strAddr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.shell.cli.CliUtils;
import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;

Expand All @@ -48,7 +47,7 @@ public String getCommandName() {
public int run(CommandLine cl) throws IOException {
super.run(cl);

try (RaftClient client = CliUtils.newRaftClient(getRaftGroup())) {
try (RaftClient client = newRaftClient()) {
RaftPeerId leaderId = RaftPeerId.valueOf(getLeader(getGroupInfoReply().getRoleInfoProto()).getId());
final RaftClientReply transferLeadershipReply = client.admin().transferLeadership(null, leaderId, 60_000);
processReply(transferLeadershipReply, () -> "Failed to step down leader");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.shell.cli.CliUtils;
import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -74,7 +73,7 @@ public int run(CommandLine cl) throws IOException {
printf("Peer with address %s not found.", strAddr);
return -2;
}
try (RaftClient client = CliUtils.newRaftClient(getRaftGroup())) {
try (RaftClient client = newRaftClient()) {
// transfer leadership
if (!tryTransfer(client, newLeader, highestPriority, timeout.orElse(timeoutDefault))) {
// legacy mode, transfer leadership by setting priority.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public int run(CommandLine cl) throws IOException {
+ " options are missing.");
}

try(final RaftClient raftClient = CliUtils.newRaftClient(getRaftGroup())) {
try(final RaftClient raftClient = newRaftClient()) {
GroupListReply reply = raftClient.getGroupManagementApi(peerId).list();
processReply(reply, () -> String.format("Failed to get group information of peerId %s (server %s)",
peerId, address));
Expand Down
Loading

0 comments on commit 6451f9b

Please sign in to comment.