Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2168. Support custom gRPC services. #1169

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ratis-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks a lot for reviewing this!

<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-server-api</artifactId>
</dependency>
<dependency>
<artifactId>ratis-server</artifactId>
<groupId>org.apache.ratis</groupId>
Expand Down
29 changes: 20 additions & 9 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.server.GrpcServices;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -230,15 +231,6 @@ static void setAsyncRequestThreadPoolSize(RaftProperties properties, int port) {
setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
}

String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
static GrpcTlsConfig tlsConf(Parameters parameters) {
return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
}
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}

String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".leader.outstanding.appends.max";
int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 8;
static int leaderOutstandingAppendsMax(RaftProperties properties) {
Expand Down Expand Up @@ -301,6 +293,25 @@ static boolean zeroCopyEnabled(RaftProperties properties) {
static void setZeroCopyEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, ZERO_COPY_ENABLED_KEY, enabled);
}

String SERVICES_CUSTOMIZER_PARAMETER = PREFIX + ".services.customizer";
Class<GrpcServices.Customizer> SERVICES_CUSTOMIZER_CLASS = GrpcServices.Customizer.class;
static GrpcServices.Customizer servicesCustomizer(Parameters parameters) {
return parameters == null ? null
: parameters.get(SERVICES_CUSTOMIZER_PARAMETER, SERVICES_CUSTOMIZER_CLASS);
}
static void setServicesCustomizer(Parameters parameters, GrpcServices.Customizer customizer) {
parameters.put(SERVICES_CUSTOMIZER_PARAMETER, customizer, SERVICES_CUSTOMIZER_CLASS);
}

String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
static GrpcTlsConfig tlsConf(Parameters parameters) {
return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
}
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}
}

String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
Expand Down
19 changes: 13 additions & 6 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientRpc;
import org.apache.ratis.grpc.server.GrpcLogAppender;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.grpc.server.GrpcServices;
import org.apache.ratis.grpc.server.GrpcServicesImpl;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -64,6 +65,8 @@ static boolean checkPooledByteBufAllocatorUseCacheForAllThreads(Consumer<String>
return value;
}

private final GrpcServices.Customizer servicesCustomizer;

private final GrpcTlsConfig tlsConfig;
private final GrpcTlsConfig adminTlsConfig;
private final GrpcTlsConfig clientTlsConfig;
Expand All @@ -76,7 +79,7 @@ public static Parameters newRaftParameters(GrpcTlsConfig conf) {
}

public GrpcFactory(Parameters parameters) {
this(
this(GrpcConfigKeys.Server.servicesCustomizer(parameters),
GrpcConfigKeys.TLS.conf(parameters),
GrpcConfigKeys.Admin.tlsConf(parameters),
GrpcConfigKeys.Client.tlsConf(parameters),
Expand All @@ -85,11 +88,14 @@ public GrpcFactory(Parameters parameters) {
}

public GrpcFactory(GrpcTlsConfig tlsConfig) {
this(tlsConfig, null, null, null);
this(null, tlsConfig, null, null, null);
}

private GrpcFactory(GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
private GrpcFactory(GrpcServices.Customizer servicesCustomizer,
GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
this.servicesCustomizer = servicesCustomizer;

this.tlsConfig = tlsConfig;
this.adminTlsConfig = adminTlsConfig;
this.clientTlsConfig = clientTlsConfig;
Expand Down Expand Up @@ -123,10 +129,11 @@ public LogAppender newLogAppender(RaftServer.Division server, LeaderState state,
}

@Override
public GrpcService newRaftServerRpc(RaftServer server) {
public GrpcServices newRaftServerRpc(RaftServer server) {
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::info);
return GrpcService.newBuilder()
return GrpcServicesImpl.newBuilder()
.setServer(server)
.setCustomizer(servicesCustomizer)
.setAdminTlsConfig(getAdminTlsConfig())
.setServerTlsConfig(getServerTlsConfig())
.setClientTlsConfig(getClientTlsConfig())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, Foll
}

@Override
public GrpcService getServerRpc() {
return (GrpcService)super.getServerRpc();
public GrpcServicesImpl getServerRpc() {
return (GrpcServicesImpl)super.getServerRpc();
}

private GrpcServerProtocolClient getClient() throws IOException {
Expand Down Expand Up @@ -428,7 +428,7 @@ private static void sleep(TimeDuration waitTime, boolean heartbeat)

private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto) throws InterruptedIOException {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
CodeInjectionForTesting.execute(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST,
getServer().getId(), null, proto);
resetHeartbeatTrigger();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.grpc.server;

import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;

import java.util.EnumSet;

/** The gRPC services extending {@link RaftServerRpc}. */
public interface GrpcServices extends RaftServerRpc {
/** The type of the services. */
enum Type {ADMIN, CLIENT, SERVER}

/**
* To customize the services.
* For example, add a custom service.
*/
interface Customizer {
/** The default NOOP {@link Customizer}. */
class Default implements Customizer {
private static final Default INSTANCE = new Default();

@Override
public NettyServerBuilder customize(NettyServerBuilder builder, EnumSet<GrpcServices.Type> types) {
return builder;
}
}

static Customizer getDefaultInstance() {
return Default.INSTANCE;
}

/**
* Customize the given builder for the given types.
*
* @return the customized builder.
*/
NettyServerBuilder customize(NettyServerBuilder builder, EnumSet<GrpcServices.Type> types);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
Expand Down Expand Up @@ -51,6 +52,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -60,11 +62,12 @@
import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;

/** A grpc implementation of {@link org.apache.ratis.server.RaftServerRpc}. */
public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient,
PeerProxyMap<GrpcServerProtocolClient>> {
static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
public final class GrpcServicesImpl
extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>>
implements GrpcServices {
static final Logger LOG = LoggerFactory.getLogger(GrpcServicesImpl.class);
public static final String GRPC_SEND_SERVER_REQUEST =
JavaUtils.getClassSimpleName(GrpcService.class) + ".sendRequest";
JavaUtils.getClassSimpleName(GrpcServicesImpl.class) + ".sendRequest";

class AsyncService implements RaftServerAsynchronousProtocol {

Expand Down Expand Up @@ -102,6 +105,7 @@ public void onCompleted() {

public static final class Builder {
private RaftServer server;
private Customizer customizer;

private String adminHost;
private int adminPort;
Expand Down Expand Up @@ -150,6 +154,11 @@ public Builder setServer(RaftServer raftServer) {
return this;
}

public Builder setCustomizer(Customizer customizer) {
this.customizer = customizer != null? customizer : Customizer.getDefaultInstance();
return this;
}

private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) {
return new GrpcServerProtocolClient(target, flowControlWindow.getSizeInt(),
requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
Expand Down Expand Up @@ -177,6 +186,10 @@ private MetricServerInterceptor newMetricServerInterceptor() {
JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort);
}

Server buildServer(NettyServerBuilder builder, EnumSet<GrpcServices.Type> types) {
return customizer.customize(builder, types).build();
}

private NettyServerBuilder newNettyServerBuilderForServer() {
return newNettyServerBuilder(serverHost, serverPort, serverTlsConfig);
}
Expand Down Expand Up @@ -223,21 +236,24 @@ private boolean separateClientServer() {
}

Server newServer(GrpcClientProtocolService client, ZeroCopyMetrics zeroCopyMetrics, ServerInterceptor interceptor) {
final EnumSet<GrpcServices.Type> types = EnumSet.of(GrpcServices.Type.SERVER);
final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer();
final ServerServiceDefinition service = newGrpcServerProtocolService(zeroCopyMetrics).bindServiceWithZeroCopy();
serverBuilder.addService(ServerInterceptors.intercept(service, interceptor));

if (!separateAdminServer()) {
types.add(GrpcServices.Type.ADMIN);
addAdminService(serverBuilder, server, interceptor);
}
if (!separateClientServer()) {
types.add(GrpcServices.Type.CLIENT);
addClientService(serverBuilder, client, interceptor);
}
return serverBuilder.build();
return buildServer(serverBuilder, types);
}

public GrpcService build() {
return new GrpcService(this);
public GrpcServicesImpl build() {
return new GrpcServicesImpl(this);
}

public Builder setAdminTlsConfig(GrpcTlsConfig config) {
Expand Down Expand Up @@ -273,11 +289,7 @@ public static Builder newBuilder() {
private final MetricServerInterceptor serverInterceptor;
private final ZeroCopyMetrics zeroCopyMetrics = new ZeroCopyMetrics();

public MetricServerInterceptor getServerInterceptor() {
return serverInterceptor;
}

private GrpcService(Builder b) {
private GrpcServicesImpl(Builder b) {
super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient));

this.executor = b.newExecutor();
Expand All @@ -291,7 +303,7 @@ private GrpcService(Builder b) {
if (b.separateAdminServer()) {
final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin();
addAdminService(builder, b.server, serverInterceptor);
final Server adminServer = builder.build();
final Server adminServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.ADMIN));
servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
adminServerAddressSupplier = newAddressSupplier(b.adminPort, adminServer);
} else {
Expand All @@ -301,7 +313,7 @@ private GrpcService(Builder b) {
if (b.separateClientServer()) {
final NettyServerBuilder builder = b.newNettyServerBuilderForClient();
addClientService(builder, clientProtocolService, serverInterceptor);
final Server clientServer = builder.build();
final Server clientServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.CLIENT));
servers.put(GrpcClientProtocolService.class.getName(), clientServer);
clientServerAddressSupplier = newAddressSupplier(b.clientPort, clientServer);
} else {
Expand Down Expand Up @@ -419,6 +431,11 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ
return getProxies().getProxy(target).startLeaderElection(request);
}

@VisibleForTesting
MessageMetrics getMessageMetrics() {
return serverInterceptor.getMetrics();
}

@VisibleForTesting
public ZeroCopyMetrics getZeroCopyMetrics() {
return zeroCopyMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.grpc.server.GrpcServicesImpl;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
Expand Down Expand Up @@ -62,7 +62,7 @@ default Factory<MiniRaftClusterWithGrpc> getFactory() {
}

public static final DelayLocalExecutionInjection SEND_SERVER_REQUEST_INJECTION =
new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
new DelayLocalExecutionInjection(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST);

public MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties, Parameters parameters) {
this(ids, new String[0], properties, parameters);
Expand Down Expand Up @@ -101,7 +101,7 @@ public void assertZeroCopyMetrics() {
getServers().forEach(server -> server.getGroupIds().forEach(id -> {
LOG.info("Checking {}-{}", server.getId(), id);
RaftServer.Division division = RaftServerTestUtil.getDivision(server, id);
GrpcService service = (GrpcService) RaftServerTestUtil.getServerRpc(division);
final GrpcServicesImpl service = (GrpcServicesImpl) RaftServerTestUtil.getServerRpc(division);
ZeroCopyMetrics zeroCopyMetrics = service.getZeroCopyMetrics();
Assert.assertEquals(0, zeroCopyMetrics.nonZeroCopyMessages());
Assert.assertEquals("Zero copy messages are not released, please check logs to find leaks. ",
Expand Down
Loading
Loading