Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ public class DatanodeConfiguration {
)
private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;

/**
* Number of threads per volume that Datanode will use for chunk read.
*/
@Config(key = "read.chunk.threads.per.volume",
type = ConfigType.INT,
defaultValue = "10",
tags = {DATANODE},
description = "Number of threads per volume that Datanode will use for " +
"reading replicated chunks."
)
private int numReadThreadPerVolume = 10;

static final int CONTAINER_DELETE_THREADS_DEFAULT = 2;
static final int BLOCK_DELETE_THREADS_DEFAULT = 5;

Expand Down Expand Up @@ -361,4 +373,11 @@ public void setChunkDataValidationCheck(boolean writeChunkValidationCheck) {
isChunkDataValidationCheck = writeChunkValidationCheck;
}

public void setNumReadThreadPerVolume(int threads) {
this.numReadThreadPerVolume = threads;
}

public int getNumReadThreadPerVolume() {
return numReadThreadPerVolume;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand All @@ -37,6 +40,7 @@
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
Expand All @@ -45,10 +49,19 @@
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.ratis.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.ServerChannel;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -67,7 +80,9 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
private final ContainerDispatcher storageContainer;
private boolean isStarted;
private DatanodeDetails datanodeDetails;

private ThreadPoolExecutor readExecutors;
private EventLoopGroup eventLoopGroup;
private Class<? extends ServerChannel> channelType;

/**
* Constructs a Grpc server class.
Expand All @@ -89,8 +104,38 @@ public XceiverServerGrpc(DatanodeDetails datanodeDetails,
this.port = 0;
}

final int threadCountPerDisk =
conf.getObject(DatanodeConfiguration.class).getNumReadThreadPerVolume();
final int numberOfDisks =
HddsServerUtil.getDatanodeStorageDirs(conf).size();
final int poolSize = threadCountPerDisk * numberOfDisks;

readExecutors = new ThreadPoolExecutor(poolSize, poolSize,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ChunkReader-%d")
.build());

ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ChunkReader-ELG-%d")
.build();

if (Epoll.isAvailable()) {
eventLoopGroup = new EpollEventLoopGroup(poolSize / 10, factory);
channelType = EpollServerSocketChannel.class;
} else {
eventLoopGroup = new NioEventLoopGroup(poolSize / 10, factory);
channelType = NioServerSocketChannel.class;
}

LOG.info("GrpcServer channel type {}", channelType.getSimpleName());
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.bossEventLoopGroup(eventLoopGroup)
.workerEventLoopGroup(eventLoopGroup)
.channelType(channelType)
.executor(readExecutors)
.addService(ServerInterceptors.intercept(
new GrpcXceiverService(dispatcher), new GrpcServerInterceptor()));

Expand Down Expand Up @@ -149,8 +194,11 @@ public void start() throws IOException {
@Override
public void stop() {
if (isStarted) {
server.shutdown();
try {
eventLoopGroup.shutdownGracefully().sync();
readExecutors.shutdown();
readExecutors.awaitTermination(5L, TimeUnit.SECONDS);
server.shutdown();
server.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("failed to shutdown XceiverServerGrpc", e);
Expand Down