第一阶段:创建阶段
-
1.NioEventLoopGroup构造方法
ThreadFactory创建、每一个线程独立的名字
默认创建CPU核心数*2(Runtime.getRuntime.availableProcessors() * 2)
每个线程会有一个独立的名字ThreadPerTaskExecutor Thread
-
2.new Child()方法
给每一个EventLoop创建一个队列tailTasks
newMpscQueue(无锁串行化队列)
-
3.会被放入到线程队列里面去MpsQueue(线程池内部的队列)
-
4.new Chooser() 获取一个线程(封装成了EventLoop对象)
执行阶段会调用EventLoop.execute()
位运算比取余运算效率更高
第二阶段:执行阶段
无锁化串行的执行流程
-
1.bind()
-
2.调用eventLoop的execute()方法
inEventLoop()现在拿到的这个EventLoop是不是自己Netty创建的线程
每个线程任务,一定是Netty创建的,保证线程安全性
-
3.select 关联,轮询所有的事件
// 封装SelectionKey的方法
// JDK的selector存在的bug,空轮询的bug
// 有可能达到CPU 100%
select()方法,Netty空轮询计数器,默认是512次
rebuildSelector()
key.cancel
//轮询方法
processSelectedKeys()方法
关于Selector,在Netty把HashSet直接替换成数组省去了繁杂的操作。
第三阶段:新连接的接入处理
Nagle算法
NioSocketChannel OP_READ
Pipeline 最终放到全部是ChannelHandler
线程池下执行的。
Reactor 的线程模型有三种:单线程模型、多线程模型、主从多线程模型。首先来看一下单线程模型。
所谓单线程, 即 Acceptor 处理和 andler 处理都在同一个线程中处理。这个模型的坏处显而易见:当其中某个 Handler 阻塞时, 会导致其他所有的 Client 的 Handler 都得不到执行,并且更严重的是,Handler 的阻塞也会导致整个服务不能 接收新的 Client 请求(因为 Acceptor 也被阻塞了)。因为有这么多的缺陷,因此单线程 Reactor 模型应用场景比较少。
Reactor 的多线程模型与单线程模型的区别就是 Acceptor 是一个单独的线程处理,并且 有一组特定的 NIO 线程来负责各个客户端连接的 IO 操作。Reactor 多线程模型如下图所示:
Reactor 多线程模型有如下特点:
1、有专门一个线程,即 Acceptor 线程用于监听客户端的 TCP 连接请求。
2、客户端连接的 IO 操作都由一个特定的 NIO 线程池负责.每个客户端连接都与一个特定的 NIO 线程绑定,因此在这个 客户端连接中的所有 IO 操作都是在同一个线程中完成的。
3、客户端连接有很多,但是 NIO 线程数是比较少的,因此一个 NIO 线程可以同时绑定到多个客户端连接中。
接下来我们再来看一下 Reactor 的主从多线程模型。一般情况下, Reactor 的多线程模式已经可以很好的工作了, 但是我们想象一个这样的场景:如果我们的服务器需要同时处理大量的客户端连接请求或我们需要在客户端连接时, 进行一些权限的校验,那么单线程的 Acceptor 很有可能就处理不过来,造成了大量的客户端不能连接到服务器。
Reactor 的主从多线程模型就是在这样的情况下提出来的,它的特点是:服务器端接收客户端的连接请求不再是一 个线程,而是由一个独立的线程池组成。其线程模型如下图所示:
Reactor 的主从多线程模型和 Reactor 多线程模型很类似,只不过 Reactor 的主从多线程模型的 Acceptor 使用了线程池来处理大量的客户端请求。
- 1.单线程模型
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);
注意,我们实例化了一个 NioEventLoopGroup,然后接着我们调用 server.group(bossGroup)设置了服务器端的 EventLoopGroup。有人可能会有疑惑;我记得在启动服务器端的 Netty 程序时, 需要设置 bossGroup 和 workerGroup, 为何这里只设置一个 bossGroup?其实原因很简单,ServerBootstrap 重写了 group 方法:
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
因此当传入一个 group 时,那么 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup 了。这时,因为 bossGroup 和 workerGroup 就是同一个 NioEventLoopGroup,并且这个 NioEventLoopGroup 线程池数量只设置了 1 个线程,也就是说 Netty 中的 Acceptor 和后续的所有客户端连接的 IO 操作都是在一个线程中处理的。那么对应到 Reactor 的线程模型中,我们这样设置 NioEventLoopGroup 时,就相当于 Reactor
- 2.多线程模型
EventLoopGroup bossGroup = new NioEventLoopGroup(128);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);
- 3.主从线程模型
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
bossGroup 为主线程,而 workerGroup 中的线程是 CPU 核心数乘以 2,因此对应的到 Reactor 线程模型中,我们知道, 这样设置的 NioEventLoopGroup 其实就是 Reactor 主从多线程模型。
基本步骤如下:
1、EventLoopGroup(其实是 MultithreadEventExecutorGroup)内部维护一个类为 EventExecutor children 数组, 其大小是 nThreads,这样就初始化了一个线程池。
2、如果我们在实例化 NioEventLoopGroup 时,如果指定线程池大小,则 nThreads 就是指定的值,否则是 CPU 核数 * 2
3、在 MultithreadEventExecutorGroup 中会调用 newChild()抽象方法来初始化 children 数组。
4、抽象方法 newChild()实际是在 NioEventLoopGroup 中实现的,由它返回一个 NioEventLoop 实例。
5、初始化 NioEventLoop 主要属性:
provider:在 NioEventLoopGroup 构造器中通过 SelectorProvider 的 provider()方法获取 SelectorProvider。
selector:在 NioEventLoop 构造器中调用 selector = provider.openSelector()方法获取 Selector 对象。
NioEventLoop 的类层次结构图还是有些复杂的,不过我们只需要关注几个重要点即可。首先来看 NioEventLoop 的继 承链:NioEventLoop->SingleThreadEventLoop->SingleThreadEventExecutor->AbstractScheduledEventExecutor。 在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能,即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务。而在 SingleThreadEventLoop 中,又实现了任务队列的功 能,通过它,我们可以调用一个 NioEventLoop 实例的 execute()方法来向任务队列中添加一个 task,并由 NioEventLoop 进行调度执行。
通常来说,NioEventLoop 负责执行两个任务:
第一个任务是作为 IO 线程,执行与 Channel 相关的 IO 操作,包括调 用 Selector 等待就绪的 IO 事件、读写数据与数据的处理等;
第二个任务是作为任务队列,执行 taskQueue 中的任 务,例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的。
从 上 图 可 以 看 到 , SingleThreadEventExecutor 有 一 个 名 为 thread 的 Thread 类 型 字 段 , 这 个 字 段 就 是 与 SingleThreadEventExecutor 关联的本地线程。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
SingleThreadEventExecutor 启动时会调用 doStartThread()方法,然后调用 executor.execute() 方法,将当前线程赋值给 thread。在这个线程中所做的事情主要就是调用 SingleThreadEventExecutor.this.run()方法, 而因为 NioEventLoop 实现了这个方法,因此根据多态性,其实调用的是 NioEventLoop.run()方法。
在 Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联, 它们的关联过程如下:
从上图中我们可以看到,当调用 AbstractChannel$AbstractUnsafe.register()方法后,就完成了 Channel 和 EventLoop的关联。register()方法的具体实现如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
在 AbstractChannel$AbstractUnsafe.register() 方 法 中 , 会 将 一 个 EventLoop 赋 值 给 AbstractChannel 内 部 的 eventLoop 字段,这句代码就是完成 EventLoop与Channel的关联过程。
NioEventLoop 本身就是一个 SingleThreadEventExecutor,因此 NioEventLoop 的启动,其 实就是 NioEventLoop 所绑定的本地 Java 线程的启动。
按照这个思路,我们只需要找到在哪里调用了 SingleThreadEventExecutor 中 thread 字段的 start()方法就可以知道是 在 哪里启动的这个线程了 。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
STATE_UPDATER 是 SingleThreadEventExecutor 内部维护的一个属性,它的作用是标识当前的 thread 的状态。在初 始的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用 startThread()方法时,就会进入到 if 语句内, 进而调用到 thread.start()方法。而这个关键的 startThread()方法又是在哪里调用的呢?用方法调用关系反向查找功能,我们发现,startThread 是在 SingleThreadEventExecutor 的 execute()方法中调用的:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
既然如此,那现在我们的工作就变为了寻找在哪里第一次调用了 SingleThreadEventExecutor 的 execute()方法.
在 注 册 channel 的 过 程 中 , 会 在 AbstractChannel$AbstractUnsafe 的 register()中调用 eventLoop.execute()方法,在 EventLoop 中进行 Channel 注册 代码的执行,AbstractChannel$AbstractUnsafe 的 register()部分代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
一路从 Bootstrap 的 bind()方法跟踪到 AbstractChannel$AbstractUnsafe 的 register()方法,整个代码都是在 主线程中运行的,因此上面的 eventLoop.inEventLoop()返回为 false,于是进入到 else 分支,在这个分支中调用了 eventLoop.execute()方法,而 NioEventLoop 没有实现 execute()方法,因此调用的是 SingleThreadEventExecutor 的execute()方法:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = this.inEventLoop();
this.addTask(task);
if (!inEventLoop) {
this.startThread();
if (this.isShutdown()) {
boolean reject = false;
try {
if (this.removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException var6) {
}
if (reject) {
reject();
}
}
}
if (!this.addTaskWakesUp && immediate) {
this.wakeup(inEventLoop);
}
}private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = this.inEventLoop();
this.addTask(task);
if (!inEventLoop) {
this.startThread();
if (this.isShutdown()) {
boolean reject = false;
try {
if (this.removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException var6) {
}
if (reject) {
reject();
}
}
}
if (!this.addTaskWakesUp && immediate) {
this.wakeup(inEventLoop);
}
}
我们已经分析过了,inEventLoop == false,因此执行到 else 分支,在这里就调用 startThread()方法来启动 SingleThreadEventExecutor 内部关联的 Java 本地线程了。
总结一句话:当 EventLoop 的 execute()第一次被调用时,就会触发 startThread()方法的调用,进而导致 EventLoop 所对应的 Java 本地线程启动。