1、接受和发送ByteBuffer使用堆外直接内存进行Socket读写。
2、提供了组合Buffer对象,可以聚合多个ByteBuffer对象。
3、transferTo直接将文件缓冲区的数据发送到目标Channel。
1、Pooled和UnPooled(池化和非池化)
2、UnSafe和非UnSafe(底层读写与应用程序读写)
3、Heap和Direct(堆内存和对外内存)
1、Reactor单线程模型
2、Reactor多线程模型
3、主从Reactor多线程模型
1、volatile的大量、正确使用;
2、CAS和原子类的广泛使用;
3、线程安全容器的使用;
4、通过读写锁提升并发性能。
1、序列化后的码流大小(网络带宽的占用);
2、序列化&反序列化的性能(CPU资源占用);
3、是否支持跨语言(异构系统的对接和开发语言切换)
-
NewChannel()创建Channel对象
public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
-
init()初始化预设参数
// ServerBootstrap.java void init(Channel channel) { setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
// NioServerSocketChannel.java public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
// AbstractChannel.java protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
-
doBind()绑定端口
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
-
register()注册到Selector
AbstractChannel.this.eventLoop = eventLoop;
// Provider 获得一个Selector
// 把Server注册到一个Selector
// 显式的设置一个ConfigureBlocking(false)
// 调用原生的bind方法
// 注册一个空事件
NioServerSocketChannel
ServerSocketChannel
attr(NioServerSocketChannel)
// 便于扩展
API是原生NIO设计的,又是基于系统网卡设计的