From d217c7d5c79daa19aefbdd875d36b41b74f5c74a Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Wed, 10 Nov 2021 16:45:37 +0800 Subject: [PATCH] HDDS-5962. Limit grpc threads in XceiverServerGrpc. --- .../statemachine/DatanodeConfiguration.java | 19 +++++++ .../transport/server/XceiverServerGrpc.java | 52 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java index c483262f3d1d..a2509a119bcf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java @@ -79,6 +79,18 @@ public class DatanodeConfiguration { ) private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + /** + * Number of threads per volume that Datanode will use for chunk read. + */ + @Config(key = "read.chunk.threads.per.volume", + type = ConfigType.INT, + defaultValue = "10", + tags = {DATANODE}, + description = "Number of threads per volume that Datanode will use for " + + "reading replicated chunks." + ) + private int numReadThreadPerVolume = 10; + static final int CONTAINER_DELETE_THREADS_DEFAULT = 2; static final int BLOCK_DELETE_THREADS_DEFAULT = 5; @@ -361,4 +373,11 @@ public void setChunkDataValidationCheck(boolean writeChunkValidationCheck) { isChunkDataValidationCheck = writeChunkValidationCheck; } + public void setNumReadThreadPerVolume(int threads) { + this.numReadThreadPerVolume = threads; + } + + public int getNumReadThreadPerVolume() { + return numReadThreadPerVolume; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index c3157f6e9f86..a83bbfb96e6b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -22,6 +22,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -37,6 +40,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -45,10 +49,19 @@ import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.util.GlobalTracer; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.ratis.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.ratis.thirdparty.io.grpc.Server; import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel; +import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel; import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +80,9 @@ public final class XceiverServerGrpc implements XceiverServerSpi { private final ContainerDispatcher storageContainer; private boolean isStarted; private DatanodeDetails datanodeDetails; - + private ThreadPoolExecutor readExecutors; + private EventLoopGroup eventLoopGroup; + private Class channelType; /** * Constructs a Grpc server class. @@ -89,8 +104,38 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails, this.port = 0; } + final int threadCountPerDisk = + conf.getObject(DatanodeConfiguration.class).getNumReadThreadPerVolume(); + final int numberOfDisks = + HddsServerUtil.getDatanodeStorageDirs(conf).size(); + final int poolSize = threadCountPerDisk * numberOfDisks; + + readExecutors = new ThreadPoolExecutor(poolSize, poolSize, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ChunkReader-%d") + .build()); + + ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("ChunkReader-ELG-%d") + .build(); + + if (Epoll.isAvailable()) { + eventLoopGroup = new EpollEventLoopGroup(poolSize / 10, factory); + channelType = EpollServerSocketChannel.class; + } else { + eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory); + channelType = NioServerSocketChannel.class; + } + + LOG.info("GrpcServer channel type {}", channelType.getSimpleName()); NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) + .bossEventLoopGroup(eventLoopGroup) + .workerEventLoopGroup(eventLoopGroup) + .channelType(channelType) + .executor(readExecutors) .addService(ServerInterceptors.intercept( new GrpcXceiverService(dispatcher), new GrpcServerInterceptor())); @@ -149,8 +194,11 @@ public void start() throws IOException { @Override public void stop() { if (isStarted) { - server.shutdown(); try { + eventLoopGroup.shutdownGracefully().sync(); + readExecutors.shutdown(); + readExecutors.awaitTermination(5L, TimeUnit.SECONDS); + server.shutdown(); server.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.error("failed to shutdown XceiverServerGrpc", e);