Skip to content

Commit

Permalink
[ISSUE apache#6117]Optimize NettyRemotingServer EventLoopGroup create
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Feb 19, 2023
1 parent 975b4f1 commit 183138f
Showing 1 changed file with 12 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
Expand All @@ -52,11 +53,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,47 +128,17 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,

private EventLoopGroup buildEventLoopGroupSelector() {
if (useEpoll()) {
return new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
private final int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
return new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactoryImpl("NettyServerEPOLLSelector_"));
} else {
return new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
private final int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
return new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactoryImpl("NettyServerNIOSelector_"));
}
}

private EventLoopGroup buildBossEventLoopGroup() {
if (useEpoll()) {
return new EpollEventLoopGroup(1, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
return new EpollEventLoopGroup(1, new ThreadFactoryImpl("NettyEPOLLBoss_"));
} else {
return new NioEventLoopGroup(1, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
return new NioEventLoopGroup(1, new ThreadFactoryImpl("NettyNIOBoss_"));
}
}

Expand All @@ -177,23 +148,13 @@ private ExecutorService buildPublicExecutor(NettyServerConfig nettyServerConfig)
publicThreadNums = 4;
}

return Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
return Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyServerPublicExecutor_"));
}

private ScheduledExecutorService buildScheduleExecutor() {
return new ScheduledThreadPoolExecutor(1,
r -> {
Thread thread = new Thread(r, "NettyServerScheduler");
thread.setDaemon(true);
return thread;
}, new ThreadPoolExecutor.DiscardOldestPolicy());
new ThreadFactoryImpl("NettyServerScheduler_", true),
new ThreadPoolExecutor.DiscardOldestPolicy());
}

public void loadSslContext() {
Expand All @@ -220,15 +181,7 @@ private boolean useEpoll() {
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {

private final AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
new ThreadFactoryImpl("NettyServerCodecThread_"));

prepareSharableHandlers();

Expand Down Expand Up @@ -290,6 +243,7 @@ public void run() {

/**
* config channel in ChannelInitializer
*
* @param ch the SocketChannel needed to init
* @return the initialized ChannelPipeline, sub class can use it to extent in the future
*/
Expand Down

0 comments on commit 183138f

Please sign in to comment.