Skip to content

Commit

Permalink
[ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the…
Browse files Browse the repository at this point in the history
… scalability of ProtocolNegotiator (#6867)

[ISSUE #6866] Move the judgment logic of grpc TLS mode to improve the scalability of ProtocolNegotiator

---------

Co-authored-by: 徒钟 <[email protected]>
  • Loading branch information
dingshuangxi888 and 徒钟 authored Jun 8, 2023
1 parent f10f905 commit 4d82b30
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,11 @@
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.cert.CertificateException;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,13 +34,10 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.interceptor.AuthenticationInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;

public class GrpcServerBuilder {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
Expand All @@ -63,12 +50,7 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
serverBuilder = NettyServerBuilder.forPort(port);

try {
configSslContext(serverBuilder);
} catch (Exception e) {
log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
throw new RuntimeException("grpc tls set failed: " + e.getMessage());
}
serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator());

// build server
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
Expand Down Expand Up @@ -116,45 +98,6 @@ public GrpcServer build() {
return new GrpcServer(this.serverBuilder.build());
}

protected void configSslContext(NettyServerBuilder serverBuilder) throws IOException, CertificateException {
if (null == serverBuilder) {
return;
}

TlsMode tlsMode = TlsSystemConfig.tlsMode;
if (!TlsMode.DISABLED.equals(tlsMode)) {
SslContext sslContext = loadSslContext();
if (TlsMode.PERMISSIVE.equals(tlsMode)) {
serverBuilder.protocolNegotiator(new OptionalSSLProtocolNegotiator(sslContext));
} else {
serverBuilder.sslContext(sslContext);
}
}
}

protected SslContext loadSslContext() throws CertificateException, IOException {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isTlsTestModeEnable()) {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return GrpcSslContexts.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
} else {
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
try (InputStream serverKeyInputStream = Files.newInputStream(Paths.get(tlsKeyPath));
InputStream serverCertificateStream = Files.newInputStream(Paths.get(tlsCertPath))) {
SslContext res = GrpcSslContexts.forServer(serverCertificateStream, serverKeyInputStream)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
log.info("load TLS configured OK");
return res;
}
}
}

public GrpcServerBuilder configInterceptor() {
// grpc interceptors, including acl, logging etc.
List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,44 @@
package org.apache.rocketmq.proxy.grpc;

import io.grpc.netty.shaded.io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.shaded.io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandler;
import io.grpc.netty.shaded.io.netty.channel.ChannelHandlerContext;
import io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;

public class OptionalSSLProtocolNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final SslContext sslContext;
protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

/**
* the length of the ssl record header (in bytes)
*/
private static final int SSL_RECORD_HEADER_LENGTH = 5;

public OptionalSSLProtocolNegotiator(SslContext sslContext) {
this.sslContext = sslContext;
private static SslContext sslContext;

public OptionalSSLProtocolNegotiator() {
sslContext = loadSslContext();
}

@Override
Expand All @@ -50,43 +63,81 @@ public AsciiString scheme() {
}

@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHttp2ConnectionHandler) {
ChannelHandler plaintext =
InternalProtocolNegotiators.serverPlaintext().newHandler(grpcHttp2ConnectionHandler);
ChannelHandler ssl =
InternalProtocolNegotiators.serverTls(sslContext).newHandler(grpcHttp2ConnectionHandler);
return new PortUnificationServerHandler(ssl, plaintext);
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
return new PortUnificationServerHandler(grpcHandler);
}

@Override
public void close() {}

private static SslContext loadSslContext() {
try {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isTlsTestModeEnable()) {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
} else {
String tlsKeyPath = ConfigurationManager.getProxyConfig().getTlsKeyPath();
String tlsCertPath = ConfigurationManager.getProxyConfig().getTlsCertPath();
try (InputStream serverKeyInputStream = Files.newInputStream(
Paths.get(tlsKeyPath));
InputStream serverCertificateStream = Files.newInputStream(
Paths.get(tlsCertPath))) {
SslContext res = GrpcSslContexts.forServer(serverCertificateStream,
serverKeyInputStream)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.clientAuth(ClientAuth.NONE)
.build();
log.info("grpc load TLS configured OK");
return res;
}
}
} catch (Exception e) {
log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
throw new RuntimeException("grpc tls set failed: " + e.getMessage());
}
}

public static class PortUnificationServerHandler extends ByteToMessageDecoder {

private final ChannelHandler ssl;
private final ChannelHandler plaintext;

public PortUnificationServerHandler(ChannelHandler ssl, ChannelHandler plaintext) {
this.ssl = ssl;
this.plaintext = plaintext;
public PortUnificationServerHandler(GrpcHttp2ConnectionHandler grpcHandler) {
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
.newHandler(grpcHandler);
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
.newHandler(grpcHandler);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
throws Exception {
try {
// in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not
if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
return;
}
if (SslHandler.isEncrypted(in)) {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
if (TlsMode.ENFORCING.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
} else {
} else if (TlsMode.DISABLED.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
} else {
// in SslHandler.isEncrypted, it need at least 5 bytes to judge is encrypted or not
if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
return;
}
if (SslHandler.isEncrypted(in)) {
ctx.pipeline().addAfter(ctx.name(), null, this.ssl);
} else {
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
}
}
ctx.fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
ctx.pipeline().remove(this);
} catch (Exception e) {
log.error("process protocol negotiator failed.", e);
log.error("process ssl protocol negotiator failed.", e);
throw e;
}
}
Expand Down

0 comments on commit 4d82b30

Please sign in to comment.