Skip to content

Commit

Permalink
[ISSUE apache#6117]Optimize NettyRemotingServer EventLoopGroup create
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Feb 23, 2023
1 parent 7cfffe7 commit f626071
Showing 1 changed file with 14 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand All @@ -74,8 +73,7 @@
@SuppressWarnings("NullableProblems")
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
private static final Logger TRAFFIC_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_TRAFFIC_NAME);
private static final Logger TRAFFIC_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_TRAFFIC_NAME);

private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
Expand Down Expand Up @@ -128,47 +126,17 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,

private EventLoopGroup buildEventLoopGroupSelector() {
if (useEpoll()) {
return new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
private final int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
return new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactoryImpl("NettyServerEPOLLSelector_"));
} else {
return new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
private final int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
return new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactoryImpl("NettyServerNIOSelector_"));
}
}

private EventLoopGroup buildBossEventLoopGroup() {
if (useEpoll()) {
return new EpollEventLoopGroup(1, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
return new EpollEventLoopGroup(1, new ThreadFactoryImpl("NettyEPOLLBoss_"));
} else {
return new NioEventLoopGroup(1, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
return new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyNIOBoss_"));
}
}

Expand All @@ -178,23 +146,13 @@ private ExecutorService buildPublicExecutor(NettyServerConfig nettyServerConfig)
publicThreadNums = 4;
}

return Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
return Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyServerPublicExecutor_"));
}

private ScheduledExecutorService buildScheduleExecutor() {
return new ScheduledThreadPoolExecutor(1,
r -> {
Thread thread = new Thread(r, "NettyServerScheduler");
thread.setDaemon(true);
return thread;
}, new ThreadPoolExecutor.DiscardOldestPolicy());
new ThreadFactoryImpl("NettyServerScheduler_", true),
new ThreadPoolExecutor.DiscardOldestPolicy());
}

public void loadSslContext() {
Expand All @@ -219,17 +177,8 @@ private boolean useEpoll() {

@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {

private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("NettyServerCodecThread_"));

prepareSharableHandlers();

Expand Down Expand Up @@ -291,6 +240,7 @@ public void run() {

/**
* config channel in ChannelInitializer
*
* @param ch the SocketChannel needed to init
* @return the initialized ChannelPipeline, sub class can use it to extent in the future
*/
Expand Down Expand Up @@ -444,13 +394,13 @@ private void printRemotingCodeDistribution() {
if (distributionHandler != null) {
String inBoundSnapshotString = distributionHandler.getInBoundSnapshotString();
if (inBoundSnapshotString != null) {
TRAFFIC_LOGGER.info("Port: {}, RequestCode Distribution: {}",
TRAFFIC_LOGGER.info("Port: {}, RequestCode Distribution: {}",
nettyServerConfig.getListenPort(), inBoundSnapshotString);
}

String outBoundSnapshotString = distributionHandler.getOutBoundSnapshotString();
if (outBoundSnapshotString != null) {
TRAFFIC_LOGGER.info("Port: {}, ResponseCode Distribution: {}",
TRAFFIC_LOGGER.info("Port: {}, ResponseCode Distribution: {}",
nettyServerConfig.getListenPort(), outBoundSnapshotString);
}
}
Expand Down

0 comments on commit f626071

Please sign in to comment.