diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index c79112d3bb6c..41598061e8af 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3479,6 +3479,34 @@ OM/S3GATEWAY OMRequest, OMResponse over grpc max message length (bytes). + + + ozone.om.grpc.read.thread.num + 32 + OZONE, OM, S3GATEWAY + + OM grpc server read thread pool core thread size. + + + + + ozone.om.grpc.bossgroup.size + 8 + OZONE, OM, S3GATEWAY + + OM grpc server netty boss event group size. + + + + + ozone.om.grpc.workergroup.size + 32 + OZONE, OM, S3GATEWAY + + OM grpc server netty worker event group size. + + + ozone.default.bucket.layout diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 4442b6d06105..9a7acb02f573 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -80,6 +80,19 @@ private OMConfigKeys() { public static final int OZONE_OM_PORT_DEFAULT = 9862; public static final String OZONE_OM_GRPC_PORT_KEY = "ozone.om.grpc.port"; + + public static final String OZONE_OM_GRPC_BOSSGROUP_SIZE_KEY = + "ozone.om.grpc.bossgroup.size"; + public static final int OZONE_OM_GRPC_BOSSGROUP_SIZE_DEFAULT = 8; + + public static final String OZONE_OM_GRPC_WORKERGROUP_SIZE_KEY = + "ozone.om.grpc.workergroup.size"; + public static final int OZONE_OM_GRPC_WORKERGROUP_SIZE_DEFAULT = 32; + + public static final String OZONE_OM_GRPC_READ_THREAD_NUM_KEY = + "ozone.om.grpc.read.thread.num"; + public static final int OZONE_OM_GRPC_READ_THREAD_NUM_DEFAULT = 32; + public static final String OZONE_OM_HTTP_ENABLED_KEY = "ozone.om.http.enabled"; public static final String OZONE_OM_HTTP_BIND_HOST_KEY = diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java index 16f8af31a755..c452bf48e460 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java @@ -19,8 +19,12 @@ import java.io.IOException; import java.util.OptionalInt; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.security.ssl.KeyStoresFactory; @@ -36,6 +40,9 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; import io.grpc.Server; @@ -46,8 +53,14 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_BOSSGROUP_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_BOSSGROUP_SIZE_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_READ_THREAD_NUM_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_READ_THREAD_NUM_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_WORKERGROUP_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_WORKERGROUP_SIZE_KEY; /** * Separated network server for gRPC transport OzoneManagerService s3g->OM. @@ -61,6 +74,10 @@ public class GrpcOzoneManagerServer { private int port; private final int maxSize; + private ThreadPoolExecutor readExecutors; + private EventLoopGroup bossEventLoopGroup; + private EventLoopGroup workerEventLoopGroup; + public GrpcOzoneManagerServer(OzoneConfiguration config, OzoneManagerProtocolServerSideTranslatorPB omTranslator, @@ -95,8 +112,41 @@ public void init(OzoneManagerProtocolServerSideTranslatorPB omTranslator, OzoneDelegationTokenSecretManager delegationTokenMgr, OzoneConfiguration omServerConfig, CertificateClient caClient) { + + int poolSize = omServerConfig.getInt(OZONE_OM_GRPC_READ_THREAD_NUM_KEY, + OZONE_OM_GRPC_READ_THREAD_NUM_DEFAULT); + + int bossGroupSize = omServerConfig.getInt(OZONE_OM_GRPC_BOSSGROUP_SIZE_KEY, + OZONE_OM_GRPC_BOSSGROUP_SIZE_DEFAULT); + + int workerGroupSize = + omServerConfig.getInt(OZONE_OM_GRPC_WORKERGROUP_SIZE_KEY, + OZONE_OM_GRPC_WORKERGROUP_SIZE_DEFAULT); + + readExecutors = new ThreadPoolExecutor(poolSize, poolSize, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("OmRpcReader-%d") + .build()); + + ThreadFactory bossFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("OmRpcBoss-ELG-%d") + .build(); + bossEventLoopGroup = new NioEventLoopGroup(bossGroupSize, bossFactory); + + ThreadFactory workerFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("OmRpcWorker-ELG-%d") + .build(); + workerEventLoopGroup = + new NioEventLoopGroup(workerGroupSize, workerFactory); + NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(maxSize) + .bossEventLoopGroup(bossEventLoopGroup) + .workerEventLoopGroup(workerEventLoopGroup) + .channelType(NioServerSocketChannel.class) + .executor(readExecutors) .addService(ServerInterceptors.intercept( new OzoneManagerServiceGrpc(omTranslator, delegationTokenMgr, @@ -134,7 +184,11 @@ public void start() throws IOException { public void stop() { try { + readExecutors.shutdown(); + readExecutors.awaitTermination(5L, TimeUnit.SECONDS); server.shutdown().awaitTermination(10L, TimeUnit.SECONDS); + bossEventLoopGroup.shutdownGracefully().sync(); + workerEventLoopGroup.shutdownGracefully().sync(); LOG.info("Server {} is shutdown", getClass().getSimpleName()); } catch (InterruptedException ex) { LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());