Skip to content

Latest commit

 

History

History
916 lines (702 loc) · 44.4 KB

File metadata and controls

916 lines (702 loc) · 44.4 KB

Netty大动脉Pipeline

1.Pipeline设计原理

1.1Channel与ChannelPipeline

相信大家都已经知道,在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:

image-20220409224816686

通过上图我们可以看到,一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表。这个链表的头是 HeadContext,链表的尾是 TailContext,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。

上面的图示给了我们一个对 ChannelPipeline 的直观认识,但是实际上 Netty 实现的 Channel 是否真的是这样的呢? 我们继续用源码说话。在前我们已经知道了一个 Channel 的初始化的基本过程,下面我们再回顾一下。下面的代码是 AbstractChannel 构造器:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

AbstractChannel 有一个 pipeline 字段,在构造器中会初始化它为 DefaultChannelPipeline 的实例。这里的代码就印证 了一点:每个 Channel 都有一个 ChannelPipeline。接着我们跟踪一下 DefaultChannelPipeline 的初始化过程,首先进 入到 DefaultChannelPipeline 构造器中:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }

在 DefaultChannelPipeline 构造器中,首先将与之关联的 Channel 保存到字段 channel 中。然后实例化两个 ChannelHandlerContext:一个是 HeadContext 实例 head,另一个是 TailContext 实例 tail。接着将 head 和 tail 互相 指向, 构成一个双向链表。

特别注意的是:我们在开始的示意图中 head 和 tail 并没有包含 ChannelHandler,这是因为 HeadContext 和 TailContext 继承于 AbstractChannelHandlerContext 的同时也实现了 ChannelHandler 接口了,因此它们有 Context 和 Handler 的双重属性。

1.2 再探ChannelPipeline的初始化

我们已经对 ChannelPipeline 的初始化有了一个大致的了解,不过当时重点没有关注 ChannelPipeline,因此没有 深入地分析它的初始化过程。那么下面我们就来看一下具体的 ChannelPipeline 的初始化都做了哪些工作吧。先回顾一 下,在实例化一个 Channel 时,会伴随着一个 ChannelPipeline 的实例化,并且此 Channel 会与这个 ChannelPipeline 相互关联,这一点可以通过 NioSocketChannel 的父类 AbstractChannel 的构造器予以佐证:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

当实例化一个 NioSocketChannel 是,其 pipeline 字段就是我们新创建的 DefaultChannelPipeline 对象,那么我们就来 看一下 DefaultChannelPipeline 的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
    this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
    this.voidPromise = new VoidChannelPromise(channel, true);
    this.tail = new DefaultChannelPipeline.TailContext(this);
    this.head = new DefaultChannelPipeline.HeadContext(this);
    this.head.next = this.tail;
    this.tail.prev = this.head;
}

可以看到,在 DefaultChannelPipeline 的构造方法中,将传入的 channel 赋值给字段 this.channel,接着又实例化了两个特殊的字段:tail 与 head,这两个字段是一个双向链表的头和尾。其实在 DefaultChannelPipeline 中,维护了一个 以 AbstractChannelHandlerContext 为节点的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。再回顾一下 head 和 tail 的类层次结构:

image-20220409230044046

从类层次结构图中可以很清楚地看到,head 实现了 ChannelInboundHandler,而 tail 实现了 ChannelOutboundHandler 接口,并且它们都实现了 ChannelHandlerContext 接口, 因此可以说 head 和 tail 即是一个 ChannelHandler,又是一个 ChannelHandlerContext。接着看 HeadContext 构造器中的代码:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

它调用了父类 AbstractChannelHandlerContext 的构造器,并传入参数 inbound = false,outbound = true。而 TailContext 的构造器与 HeadContext 正好相反,它调用了父类 AbstractChannelHandlerContext 的构造器,并传入参 数 inbound = true,outbound = false。也就是说 header 是一个 OutBoundHandler,而 tail 是一个 InboundHandler, 关于这一点,大家要特别注意,因为在后面的分析中,我们会反复用到 inbound 和 outbound 这两个属性。

1.3 ChannelInitializer的添加

前面我们已经分析过 Channel 的组成,其中我们了解到,最开始的时候 ChannelPipeline 中含有两个 ChannelHandlerContext(同时也是 ChannelHandler),但是这个 Pipeline 并不能实现什么特殊的功能,因为我们还 没有给它添加自定义的 ChannelHandler。通常来说,我们在初始化 Bootstrap,会添加我们自定义的 ChannelHandler, 就以我们具体的客户端启动代码片段来举例:

    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChatClientHandler(nickName));
        }
	});

上面代码的初始化过程,相信大家都不陌生。在调用 handler 时,传入了 ChannelInitializer 对象,它提供了一个 initChannel()方法给我我们初始化 ChannelHandler>.ChannelInitializer 实现了 ChannelHandler,那么它是在什么时候添加到 ChannelPipeline 中的呢?通过代码跟踪,我 们发现它是在 Bootstrap 的 init()方法中添加到 ChannelPipeline 中的,其代码如下:

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());
    //略去 N 句代码
}

从上面的代码可见,将 handler()返回的 ChannelHandler 添加到 Pipeline 中,而 handler()返回的其实就是我们在初始 化 Bootstrap 时通过 handler()方法设置的 ChannelInitializer 实例,因此这里就是将 ChannelInitializer 插入到了 Pipeline 的末端。此时 Pipeline 的结构如下图所示:

image-20220409230614620

在 Bootstrap 的 init()方法中会调用 p.addLast()方法,将 ChannelInitializer 插入到链表的末端:

	public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

	private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

addLast()有很多重载的方法,我们只需关注这个比较重要的方法就行。上面的 addLast()方法中,首先检查 ChannelHandler 的名字是否是重复,如果不重复,则调用 newContex()方法为这个 Handler 创建一个对应的 DefaultChannelHandlerContext 实例,并与之关联起来(Context 中有一个 handler 属性保存着对应的 Handler 实例)。 为了添加一个 handler 到 pipeline 中,必须把此 handler 包装成 ChannelHandlerContext。因此在上面的代码中我们 可以看到新实例化了一个 newCtx 对象,并将 handler 作为参数传递到构造方法中。那么我们来看一下实例化的 DefaultChannelHandlerContext 到底有什么玄机吧。首先看它的构造器:

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
    throw new NullPointerException("handler");
    }
    this.handler = handler;
}

在 DefaultChannelHandlerContext 的构造器中,调用了两个很有意思的方法:isInbound()与 isOutbound(),

private static boolean isInbound(ChannelHandler handler) {
	return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
	return handler instanceof ChannelOutboundHandler;
}

从源码中可以看到,当一个 handler 实现了 ChannelInboundHandler 接口,则 isInbound 返回 true;类似地,当一个 handler 实现了 ChannelOutboundHandler 接口,则 isOutbound 就返回 true。而这两个 boolean 变量会传递到父类 AbstractChannelHandlerContext 中,并初始化父类的两个字段:inbound 与 outbound。

image-20220409231016658

从类图中可以清楚地看到,ChannelInitializer 仅仅实现了 ChannelInboundHandler 接口,因此这里实例化的 DefaultChannelHandlerContext 的 inbound = true,outbound = false。

兜了一圈,不就是 inbound 和 outbound 两个字段嘛,为什么需要这么大费周折地分析一番?其实这两个字段关系到 pipeline 的事件的流向与分类,因此是十分关键的,不过我在这里先卖个关子, 后面我们再来详细分析这两个字段所起 的作用。至此, 我们暂时先记住一个结论:ChannelInitializer 所对应的 DefaultChannelHandlerContext 的 inbound = true,outbound = false。

当创建好 Context 之后,就将这个 Context 插入到 Pipeline 的双向链表中.

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

1.4 自定义 ChannelHandler 的添加过程

1、首先在 AbstractBootstrap 的 initAndRegister()中,通过 group().register(channel),调用 MultithreadEventLoopGroup 的 register()方法。

2、在 MultithreadEventLoopGroup 的 register()中调用 next()获取一个可用的 SingleThreadEventLoop,然后调用 它的 register()方法。

3、在 SingleThreadEventLoop 的 register()方法中,通过 channel.unsafe().register(this, promise)方法获取 channel 的 unsafe()底层 IO 操作对象,然后调用它的 register()。

4、在 AbstractUnsafe 的 register()方法中,调用 register0()方法注册 Channel 对象。

5、在 AbstractUnsafe 的 register0()方法中,调用 AbstractNioChannel 的 doRegister()方法。

6、AbstractNioChannel 的 doRegister()方法调用 javaChannel().register(eventLoop().selector, 0, this)将 Channel 对应的 Java NIO 的 SockerChannel 对象注册到一个 eventLoop 的 Selector 中,并且将当前 Channel 作为 attachment。

而我们自定义 ChannelHandler 的添加过程,发生在 AbstractUnsafe 的 register0()方法中,在这个方法中调用了 pipeline.fireChannelRegistered()方法,其代码实现如下:

public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

再看 AbstractChannelHandlerContext 的 invokeChannelRegistered()方法:

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    	next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
            	next.invokeChannelRegistered();
            }
        });
    }
}

这个代码会从 head 开始遍历 Pipeline 的双向链表,然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例。

ChannelInitializer 实现了 ChannelInboudHandler,因此它所对应 的 ChannelHandlerContext 的 inbound 属性就是 true,因此这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext 对象,如下图所示:

image-20220409231442978

当获取到 inbound 的 Context 后,就调用它的 invokeChannelRegistered()方法:

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
        	((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
        	notifyHandlerException(t);
    	}
    } else {
    	fireChannelRegistered();
    }
}

我们已经知道,每个 ChannelHandler 都和一个 ChannelHandlerContext 关联,我们可以通过 ChannelHandlerContext 获取到对应的 ChannelHandler。因此很明显,这里 handler()返回的对象其实就是一开始我们实例化的 ChannelInitializer 对象,并接着调用了 ChannelInitializer 的 channelRegistered()方法。看到这里, 应该会觉得有点眼 熟了。ChannelInitializer 的 channelRegistered()这个方法我们在一开始的时候已经接触到了,但是我们并没有深入地 分析这个方法的调用过程。下面我们来看这个方法中到底有什么玄机,继续看代码:

	public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
            ctx.pipeline().fireChannelRegistered();

            // We are done with init the Channel, removing all the state for the Channel now.
            removeState(ctx);
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
            ctx.fireChannelRegistered();
        }
    }

	private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                if (!ctx.isRemoved()) {
                    ctx.pipeline().remove(this);
                }
            }
            return true;
        }
        return false;
    }

initChannel((C) ctx.channel())这个方法我们也很熟悉,它就是我们在初始化 Bootstrap 时,调用 handler 方法传入的匿 名内部类所实现的方法:

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChatClientHandler(nickName));
    }
});

因此,当调用这个方法之后, 我们自定义的 ChannelHandler 就插入到了 Pipeline,此时 Pipeline 的状态如下图所示:

image-20220409232038623

当添加完成自定义的 ChannelHandler 后,在 finally 代码块会删除自定义的 ChannelInitializer,也就是 remove(ctx)最 终调用 ctx.pipeline().remove(this),因此最后的 Pipeline 的状态如下:

image-20220409232056567

至此,自定义 ChannelHandler 的添加过程也分析得差不多了。

1.5 给ChannelHandler命名

不知道大家注意到没有,pipeline.addXXX 都有一个重载的方法,例如 addLast()它有一个重载的版本是:

ChannelPipeline addLast(String name, ChannelHandler handler);

第一个参数指定添加的 handler 的名字(更准确地说是 ChannelHandlerContext 的名字,说成 handler 的名字更便于理 解)。

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

这个方法会调用重载的 addLast()方法:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

第一个参数被设置为 null,我们不用关心它。第二参数就是这个 handler 的名字。看代码可知,在添加一个 handler 之前,需要调用 checkMultiplicity()方法来确定新添加的 handler 名字是否与已添加的 handler 名字重复。

1.6 ChannelHandler 默认命名规则

如果我们调用的是如下的 addLast()方法:

public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

那么 Netty 就会调用 generateName()方法为新添加的 handler 自动生成一个默认的名字:

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
	private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

而 generateName()方法会接着调用 generateName0()方法来实际生成一个新的 handler 名字:

    private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
    }

默认命名的规则很简单,就是用反射获取 handler 的 simpleName 加上"#0",因此我们自定义 ChatClientHandler 的名 字就是 "ChatClientHandler#0"。

2.Pipeline的事件传播机制

我们已经知道 AbstractChannelHandlerContext 中有 inbound 和 outbound 两个 boolean 变量,分别用 于标识 Context 所对应的 handler 的类型,即:

1、inbound 为 true 是,表示其对应的 ChannelHandler 是 ChannelInboundHandler 的子类。

2、outbound 为 true 时,表示对应的 ChannelHandler 是 ChannelOutboundHandler 的子类。

Netty 中的传播事件可以分为两种:Inbound 事件和 Outbound 事件。如下是从 Netty 官网针对这两个事件的说明:

image-20220409233044251

inbound 事件和 outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而 outbound 刚好相反,是从上到下。并且 inbound 的传递方式是通过调用相应的 ChannelHandlerContext.fireIN_EVT()方法,而outbound 方法的的传递方式是通过调用 ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext 的 fireChannelRegistered()调用会发送一个 ChannelRegistered 的 inbound 给下一个 ChannelHandlerContext,而 ChannelHandlerContext 的 bind()方法调用时会发送一个 bind 的 outbound 事件给下一个 ChannelHandlerContext。

Inbound 事件传播方法有:

public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

Outbound 事件传播方法有:

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error occurs
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error occurs
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

大家应该发现了规律:inbound 类似于是事件回调(响应请求的事件),而 outbound 类似于主动触发(发起请求的 事件)。注意,如果我们捕获了一个事件,并且想让这个事件继续传递下去,那么需要调用 Context 对应的传播方法 fireXXX,例如:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接成功");
        ctx.fireChannelActive();
	}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("客户端关闭");
        ctx.close(promise);
    }
}

如上面的示例代码:MyInboundHandler 收到了一个 channelActive 事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用 ctx.fireChannelActive()方法。接下来我们可以用一个代码案例来感受一下,编写如下代码:

2.1 Outbound 事件传播方式

Outbound 事件都是请求事件(request event),即请求某件事情的发生,然后通过 Outbound 事件进行通知。 Outbound 事件的传播方向是 tail -> customContext -> head。

我们接下来以 connect 事件为例,分析一下 Outbound 事件的传播机制。

首先,当用户调用了 Bootstrap 的 connect()方法时,就会触发一个 Connect 请求事件,此调用会触发如下调用链:

image-20220409233549464

继续跟踪,我们就发现 AbstractChannel 的 connect()其实由调用了 DefaultChannelPipeline 的 connect()方法:

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
	return pipeline.connect(remoteAddress, promise);
}

而 pipeline.connect()方法的实现如下:

public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
	return tail.connect(remoteAddress, promise);
}

可以看到,当 outbound 事件(这里是 connect 事件)传递到 Pipeline 后,它其实是以 tail 为起点开始传播的。

而 tail.connect()其实调用的是 AbstractChannelHandlerContext 的 connect()方法:

	public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");

        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null, false);
        }
        return promise;
    }

findContextOutbound()方法顾名思义,它的作用是以当前 Context 为起点,向 Pipeline 中的 Context 双向链表的前端 寻找第一个 outbound 属性为 true 的 Context(即关联 ChannelOutboundHandler 的 Context),然后返回。

findContextOutbound()方法代码实现如下:

	private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }

当我们找到了一个 outbound 的 Context 后,就调用它的 invokeConnect()方法,这个方法中会调用 Context 其关联的 ChannelHandler 的 connect()方法:

	private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

如果用户没有重写 ChannelHandler 的 connect()方法,那么会调用 ChannelOutboundHandlerAdapter 的 connect() 实现:

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
	ctx.connect(remoteAddress, localAddress, promise);
}

我们看到,ChannelOutboundHandlerAdapter 的 connect()仅仅调用了 ctx.connect(),而这个调用又回到了: Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect 这样的循环中,直到 connect 事件传递到 DefaultChannelPipeline 的双向链表的头节点,即 head 中。为什么会传递 到 head 中呢?回想一下,head 实现了 ChannelOutboundHandler,因此它的 outbound 属性是 true。 因为 head 本身既是一个 ChannelHandlerContext,又实现了 ChannelOutboundHandler 接口,因此当 connect()消息 传递到 head 后,会将消息转递到对应的 ChannelHandler 中处理,而 head 的 handler()方法返回的就是 head 本身:

public ChannelHandler handler() {
	return this;
}

因此最终 connect()事件是在 head 中被处理。head 的 connect()事件处理逻辑如下:

public void connect(
				ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
	unsafe.connect(remoteAddress, localAddress, promise);
}

到这里, 整个 connect()请求事件就结束了。下图中描述了整个 connect()请求事件的处理过程。

image-20220409234400744

我们仅仅以 connect()请求事件为例,分析了 outbound 事件的传播过程,但是其实所有的 outbound 的事件传播都遵 循着一样的传播规律,小伙伴们可以试着分析一下其他的 outbound 事件,体会一下它们的传播过程。

2.2 Inbound 事件传播方式

Inbound 事件和 Outbound 事件的处理过程是类似的,只是传播方向不同。

Inbound 事件是一个通知事件,即某件事已经发生了,然后通过 Inbound 事件进行通知。Inbound 通常发生在 Channel 的状态的改变或 IO 事件就绪。

Inbound 的特点是它传播方向是 head -> customContext -> tail。

上面我们分析了 connect()这个 Outbound 事件,那么接着分析 connect()事件后会发生什么 Inbound 事件,并最终找到 Outbound 和 Inbound 事件之间的联系。当 connect()这个 Outbound 传播到 unsafe 后,其实是在 AbstractNioUnsafe 的 connect()方法中进行处理的

public final void connect(
    final SocketAddress remoteAddress,
    final SocketAddress localAddress, final ChannelPromise promise) {
    if (doConnect(remoteAddress, localAddress)) {
    	fulfillConnectPromise(promise, wasActive);
    } else {
    	...
    }
}

在 AbstractNioUnsafe 的 connect()方法中,首先调用 doConnect()方法进行实际上的 Socket 连接,当连接上后会调用 fulfillConnectPromise()方法:

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (!wasActive && active) {
    	pipeline().fireChannelActive();
    }
}

我们看到,在 fulfillConnectPromise()中,会通过调用 pipeline().fireChannelActive()方法将通道激活的消息(即 Socket 连 接成功)发送出去。而这里,当调用 pipeline.fireXXX 后,就是 Inbound 事件的起点。因此当调用 pipeline().fireChannelActive()后,就产生了一个 ChannelActive Inbound 事件,我们就从这里开始看看这个 Inbound 事件是怎么传播的.

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

果然, 在 fireChannelActive()方法中,调用的是 head.invokeChannelActive(),因此可以证明 Inbound 事件在 Pipeline 中传输的起点是 head。那么,在 head.invokeChannelActive()中又做了什么呢?

    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

1、首先调用 findContextInbound(),从 Pipeline 的双向链表中中找到第一个属性 inbound 为 true 的 Context,然后将 其返回。

2、调用 Context 的 invokeChannelActive()方法,invokeChannelActive()方法源码如下:

    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelActive();
        }
    }

这个方法和 Outbound 的对应方法(如:invokeConnect()方法)如出一辙。与 Outbound 一样,如果用户没有重写 channelActive() 方法,那就会调用 ChannelInboundHandlerAdapter 的 channelActive()方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
	ctx.fireChannelActive();
}

同样地, 在 ChannelInboundHandlerAdapter 的 channelActive()中,仅仅调用了 ctx.fireChannelActive()方法,因此就 会进入 Context.fireChannelActive() -> Connect.findContextInbound() -> nextContext.invokeChannelActive() -> nextHandler.channelActive() -> nextContext.fireChannelActive()这样的循环中。同理,tail 本身既实现了 ChannelInboundHandler 接口,又实现了 ChannelHandlerContext 接口,因此当 channelActive()消息传递到 tail 后, 会将消息转递到对应的 ChannelHandler 中处理,而 tail 的 handler()返回的就是 tail 本身:

public ChannelHandler handler() {
	return this;
}

因此 channelActive Inbound 事件最终是在tail中处理的,我们看一下它的处理方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

TailContext 的 channelActive()方法是空的。如果大家自行查看 TailContext 的 Inbound 处理方法时就会发现,它们的 实现都是空的。可见,如果是 Inbound,当用户没有实现自定义的处理器时,那么默认是不处理的。下图描述了 Inbound 事件的传输过程:

image-20220409235122677

2.3 Pipeline 事件传播小结

Outbound 事件总结:

1、Outbound 事件是请求事件(由 connect()发起一个请求,并最终由 unsafe 处理这个请求)。

2、Outbound 事件的发起者是 Channel。

3、Outbound 事件的处理者是 unsafe。

4、Outbound 事件在 Pipeline 中的传输方向是 tail -> head。

5、在 ChannelHandler 中处理事件时,如果这个 Handler 不是最后一个 Handler,则需要调用 ctx 的方法(如: ctx.connect()方法)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。

6、Outbound 事件流:Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT() -> nextHandler.OUT_EVT() ->

Inbound 事件总结:

1、Inbound 事件是通知事件,当某件事情已经就绪后,通知上层。

2、Inbound 事件发起者是 unsafe。

3、Inbound 事件的处理者是 Channe,如果用户没有实现自定义的处理方法,那么 Inbound 事件默认的处理者是 TailContext,并且其处理方法是空实现。

4、Inbound 事件在 Pipeline 中传输方向是 head -> tail。

5、在 ChannelHandler 中处理事件时,如果这个 Handler 不是最后一个 Handler,则需要调用 ctx.fireIN_EVT()事 件(如:ctx.fireChannelActive()方法)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。

6、Outbound 事件流:Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() -> nextHandler.IN_EVT() -> nextContext.fireIN_EVT().

outbound 和 inbound 事件设计上十分相似,并且 Context 与 Handler 直接的调用关系也容易混淆,因此我们在阅读 这里的源码时,需要特别的注意。

3.Handler各种姿势

3.1 ChannelHandlerContext

每个 ChannelHandler 被添加到 ChannelPipeline 后,都会创建一个 ChannelHandlerContext 并与之创建的 ChannelHandler 关联绑定。ChannelHandlerContext 允许 ChannelHandler 与其他的 ChannelHandler 实现进行交互。 ChannelHandlerContext 不会改变添加到其中的 ChannelHandler,因此它是安全的。下图描述了 ChannelHandlerContext、ChannelHandler、ChannelPipeline 的关系:

image-20220410001725635

3.2 Channel 的生命周期

Netty 有一个简单但强大的状态模型,并完美映射到 ChannelInboundHandler 的各个方法。下面是 Channel 生命周期 中四个不同的状态:

image-20220410001816937

一个 Channel 正常的生命周期如下图所示。随着状态发生变化相应的事件产生。这些事件被转发到 ChannelPipeline 中的 ChannelHandler 来触发相应的操作。

image-20220410001839486

3.3 ChannelHandler 常用的 API

image-20220410001916084

Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有的类型的父类是ChannelHandler。ChannelHandler 提供了在其生命周期内添加或从 ChannelPipeline 中删除的方法。

image-20220410002159571

Netty 还提供了一个实现了 ChannelHandler 的抽象类 ChannelHandlerAdapter。ChannelHandlerAdapter 实现了父 类的所有方法,基本上就是传递事件到 ChannelPipeline 中的下一个 ChannelHandler 直到结束。我们也可以直接继承 于 ChannelHandlerAdapter,然后重写里面的方法。

3.4 ChannelInboundHandler

ChannelInboundHandler 提供了一些方法再接收数据或 Channel 状态改变时被调用。下面是 ChannelInboundHandler 的一些方法:

image-20220410002246343

image-20220410002300760

Netty 提 供 了 一 个 实 现 了 ChannelInboundHandler 接 口 并 继 承 ChannelHandlerAdapter 的 类 : ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter 实现了 ChannelInboundHandler 的所有方法, 作用就是处理消息并将消息转发到 ChannelPipeline 中的下一个 ChannelHandler。ChannelInboundHandlerAdapter 的 channelRead() 方 法 处 理 完 消 息 后 不 会 自 动 释 放 消 息 , 若 想 自 动 释 放 收 到 的 消 息 , 可 以 使 用 SimpleChannelInboundHandler,看下面的代码:

public class UnreleaseHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //手动释放消息
        ReferenceCountUtil.release(msg);
    }
}

SimpleChannelInboundHandler 会自动释放消息:

public class ReleaseHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    //不需要手动释放
    }
}

ChannelInitializer 用来初始化 ChannelHandler,将自定义的各种 ChannelHandler 添加到 ChannelPipeline 中。