Netty 启动源码分析
ServerBootstrap 的启动
1 | public static void main(String[] args) { |
从bind
方法进入调试最终会进入到 AbstractBootstrap 里的doBind
方法,其中有两个点:
AbstractBootstrap.initAndRegister:初始化 ServerSocketChannel 并将 ServerSocketChannel 注册到 selector 的方法
AbstractBootstrap.doBind0:ServerSocketChannel 绑定端口号的方法
1 | private ChannelFuture doBind(final SocketAddress localAddress) { |
AbstractBootstrap.initAndRegister
1 | final ChannelFuture initAndRegister() { |
首先会调用 ChannelFactory 工厂类的方法得到一个 Channel,相当于 NIO 中的:
1 | ServerSocketChannel ssc = ServerSocketChannel.open(); |
然后进入 init 方法,关键点在于利用刚刚得到的 channel 对象,创建了一个 pipeline,并且添加了一个 ChannelInitializer 处理器,监听初始化事件,在初始化事件中,使用 eventLoop 所在的 NIO 线程,提交一个任务,向 pipeline 中新增一个 ServerBootstrapAcceptor 用于处理新连接
1 | void init(Channel channel) { |
外层的p.addLast(new ChannelInitializer<Channel>()
添加进去的 ChannelInitializer 中的 initChannel 方法要在 AbstractUnsafe 的 register0 中 pipeline.invokeHandlerAddedIfNeeded();方法调用时被执行
扩展:里面的pipeline.addLast(handler)
添加的就是我们自己配置的 ServerBootstrap.handler 里面的 ChannelInitializer,但是这里的 initChannel 方法就直接执行,因为 DefaultChannelPipeline 中的 registered 已经变为了 true,这个值也是因为前面调用了 invokeHandlerAddedIfNeeded 最终在 DefaultChannelPipeline 中的 callHandlerAddedForAllHandlers 中变为了 true
之后config().group().register(channel)
这段代码的的调用流程图大致如下:
AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)的主要是使用 eventLoop.inEventLoop()判断当前线程是否是 eventLoop 的线程,如果在当前线程直接执行 register0(promise),否则交给 eventLoop 起一个新的线程执行
在这里现然不是,因为从上面的流程图可以看出创建新的线程是在 SingleThreadEventExecutor.doStartThread()方法里面实现的,具体是在 ThreadPerTaskExecutor.execute(Runnable command)中实现的
1 | if (eventLoop.inEventLoop()) { |
SingleThreadEventExecutor.execute(Runnable task, boolean immediate)主要将任务添加到 taskQueue 中,并且判断当前线程是否是 eventLoop 的线程,如果不是就开启一个新的线程去执行 taskQueue 中的任务,这里添加到 taskQueue 中的任务是在 AbstractUnsafe.register 方法中匿名实现的,执行 register0(promise)
之后就是就是在 eventLoop 中线程的执行的流程逻辑了,执行逻辑的流程如下
在 SingleThreadEventExecutor.doStartThread()启动一个新线程并将线程赋值给变量 thread,在这之后在该线程内 eventLoop.inEventLoop()返回的都是 true 了
SingleThreadEventExecutor.runAllTasks(long timeoutNanos)将 taskQueue 的任务取出来并执行
之后就是 register0(ChannelPromise promise)的执行流程
- AbstractNioChannel.doRegister()将 NIO 的 ServerSocketChannel 注册到 Selector 中
- DefaultChannelPipeline.invokeHandlerAddedIfNeeded() 执行 pendingHandlerCallbackHead 中的任务
- AbstractUnsafe.safeSetSuccess(ChannelPromise promise) 唤醒因为 ChannelPromise(继承了 ChannelFuture)等待的线程,并且执行相应的 listeners
AbstractNioChannel.doRegister
1 | protected void doRegister() throws Exception { |
执行selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
将 NIO 的 ServerSocketChannel 注册到 Selector 中,并且将当前的 NioServerSocketChannel 作为 attachment 加入到 NIO 的 SelectionKey 中,相当于 NIO 里面的:
1 | SelectionKey sscKey = ssc.register(selector, 0, attach); |
DefaultChannelPipeline.invokeHandlerAddedIfNeeded
DefaultChannelPipeline.invokeHandlerAddedIfNeeded() 会在 callHandlerAddedForAllHandlers()执行 pendingHandlerCallbackHead 中的任务
1 | final void invokeHandlerAddedIfNeeded() { |
1 | private void callHandlerAddedForAllHandlers() { |
在这里 pendingHandlerCallbackHead 中就有一个在前面 ServerBootstrap.init(Channel channel)中通过p.addLast(new ChannelInitializer<Channel>()
添加进去的 PendingHandlerAddedTask
1 | public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { |
1 | private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { |
PendingHandlerAddedTask.execute()的执行流程如下
最终调用的是 ServerBootstrap 中匿名实现的 ChannelInitializer.initChannel(C ch),到了这里回到前面说的p.addLast(new ChannelInitializer<Channel>()
添加进去的 ChannelInitializer,执行它的 initChannel(final Channel ch)将我们在 ServerBootstrap.handler 里面的 ChannelInitializer 添加到 DefaultChannelPipeline 中的 AbstractChannelHandlerContext 的调用链中,但是和前面 addLast 不同的是这里因为在 DefaultChannelPipeline.callHandlerAddedForAllHandlers()已经将 registered 变为了 true,直接就调用 callHandlerAdded0(final AbstractChannelHandlerContext ctx),后面的逻辑就和上面说的一样
在 ChannelInitializer.initChannel(ChannelHandlerContext ctx)中执行完 initChannel(C ch)后会调用 pipeline.remove(this),最终在 DefaultChannelPipeline.atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx)将 ChannelInitializer 从 DefaultChannelPipeline 中 AbstractChannelHandlerContext 调用链中移除
1 | private void addLast0(AbstractChannelHandlerContext newCtx) { |
1 | private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) { |
AbstractUnsafe.safeSetSuccess
RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)是使用 AtomicReferenceFieldUpdater.compareAndSet(T obj, V expect, V update)将 DefaultPromise 的 result 设置为 DefaultPromise.SUCCESS(因为 setValue0(result == null ? SUCCESS : result))
DefaultPromise.checkNotifyWaiters()判断 waiters 的数量是否大于 0,如果大于 0 就唤醒等待的线程(ChannelPromise.sync()中会调用 ChannelPromise.ChannelPromise 增加 waiters 和调用 Object.wait()等待唤醒)并且有 listeners 的话会返回 true,之后就会调用对应的 GenericFutureListener
DefaultPromise.notifyListeners()最终会调用 DefaultChannelPromise.addListener(GenericFutureListener<? extends Future<? super Void>> listener)添加进去的 GenericFutureListener
需要说明一下的是 DefaultPromise.notifyListenersNow()的代码都会 NIO 的线程里面执行,因为其判断逻辑是如果当前线程是 NIO 线程就直接用当前线程执行,如果不是就将其作为任务加入到 taskQueue 中等待执行
AbstractBootstrap.doBind0
inbound 事件会从 head 开始往后找到最后一个 inboundHandler 并执行事件相关方法,outbound 事件会从 tail 开始往前找到最后一个 outboundHandler 并执行事件相关方法
根据前面 outbound 事件的调用链逻辑发现最终会执行到 HeadContext.bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise),继续追下去找到 AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise),里面有两个需要注意的点
- NioServerSocketChannel.doBind(SocketAddress localAddress) 将 ServerSocketChannel 绑定到具体某个端口
- DefaultChannelPipeline.fireChannelActive()触发 inbound 事件调用对应 inboundHandler 对应的 channelActive(ChannelHandlerContext ctx)方法
- AbstractUnsafe.safeSetSuccess(ChannelPromise promise) 唤醒因为 ChannelPromise(继承了 ChannelFuture)等待的线程,并且执行相应的 listeners
NioServerSocketChannel.doBind
这里用的是 java8 所以会执行到 javaChannel().bind(localAddress, config.getBacklosg()),这里这些的代码就是 NIO 里面的:
1 | ssc.bind(new InetSocketAddress(8080)); |
DefaultChannelPipeline.fireChannelActive
从!wasActive && isActive()
这段代码看出 DefaultChannelPipeline.fireChannelActive()这个方法要在 channel 第一次 active 的情况下才会执行,isActive()最终就是判断 ServerSocketChannel 的 open 是否为 true 并且 ServerSocketChannel 对应的 ServerSocketAdaptor.isBound()是否为 true,这里 open 默认为 true,区别是两次 isActive()的调用分别在 javaChannel().bind(localAddress, config.getBacklosg())的前后
不过这里不是直接调用而是通过 AbstractUnsafe.invokeLater(Runnable task),将任务加入到 taskQueue,之后在 NIO 线程中 SingleThreadEventExecutor.runAllTasks 执行
DefaultChannelPipeline.fireChannelActive()的执行逻辑就是前面说的 inbound 事件调用链,执行所有 inboundHandler 对应的 channelActive(ChannelHandlerContext ctx)方法
不过需要注意的是 HeadContext 的 channelActive(ChannelHandlerContext ctx)中会调用 HeadContext.readIfIsAutoRead(),最终会调用到 AbstractNioChannel.doBeginRead(),其中selectionKey.interestOps(interestOps | readInterestOp)
(这里的 readInterestOp 就是 SelectionKey.OP_ACCEPT)相当于 NIO 中的:
1 | sscKey.interestOps(SelectionKey.OP_ACCEPT); |
AbstractUnsafe.safeSetSuccess
前面已经介绍了不再赘述
ServerBootstrap 启动中一些细节问题
EventLoop 何时被创建
在 NioEventLoopGroup(int nThreads) 构造器一路追踪,最后 MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object… args) 中调用 NioEventLoopGroup.newChild(Executor executor, Object… args)创建 EventLoop
在中间省略的过程中指定了一些 NioEventLoopGroup 和 NioEventLoop 的一些属性值,大致如下:
- selectorProvider 是 NIO 中 Selector 的提供者在
Selector.open()
和ServerSocketChannel.open()
中都有用到,默认是通过SelectorProvider.provider()
获取 - nThreads 是指定 NioEventLoopGroup 中有多少个 NioEventLoop,如果没有指定默认是使用
MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS
,这个默认是当前系统可使用的处理器数量乘以 2 - executor 每个 NioEventLoop 在 doStartThread()中创建线程的时候使用的 Executor,默认是
ThreadExecutorMap.apply(final Executor executor, final EventExecutor evntExecutor)
的匿名实现,其中参数中的 executor 默认是 ThreadPerTaskExecutor - rejectedExecutionHandler 是 NioEventLoop 中待执行任务 taskQueue 中任务超过一定数量之后的决策策略
- maxPendingTasks 指定 taskQueue 中最多可以存放的待执行任务数,通过设置
SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS
指定,默认是 16 - selectStrategyFactory 默认是 DefaultSelectStrategyFactory,构建出的 DefaultSelectStrategy 中的
calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)
会在 NioEventLoop 中的 run()方法中用到
提交普通任务时是否会结束 select 阻塞
select 方法默认阻塞的时间是获取定时任务队列 AbstractScheduledEventExecutor.scheduledTaskQueue 中的第一个任务的 deadlineNanos,如果定时任务队列中没有任务那么 curDeadlineNanos 就是 NioEventLoop.NONE,最终在 select 方法中会一直阻塞,直到被唤醒
前面config().group().register(channel)调用流程那张图中的 SingleThreadEventExecutor.execute(Runnable task)就是提交提交任务的方法,该法会继续调用 SingleThreadEventExecutor.execute(Runnable task, boolean immediate) ,其中 immediate 主要取决于任务是否不是 LazyRunnable,后面的 wakesUpForTask(task)在 SingleThreadEventExecutor 固定为 true
SingleThreadEventExecutor.execute(Runnable task, boolean immediate)中的 addTaskWakesUp 在 NioEventLoop 中默认就是 false,所以其实主要还是取决于前面说的 immediate 这个值
只有不是当前 NIO 线程并且 nextWakeupNanos 的值不是 AWAKE,才会调用 selector.wakeup()唤醒,所以多个线程多次调用 NioEventLoop.wakeup(boolean inEventLoop)下只有第一个线程会起到唤醒的效果,其他线程的调用都是无效操作浪费性能
扩展说下 addTaskWakesUp 的作用就是用于判断是否只要添加了任务就可以直接被唤醒,在 NioEventLoop 中它默认为 false 是因为这里线程的阻塞是因为调用了 Selector 的 select 方法,而 DefaultEventLoop 是任务队列中没有任务所以阻塞(在 SingleThreadEventExecutor.takeTask()中执行了 taskQueue.take()阻塞),只需要提交任务进去就会唤醒,所以 DefaultEventLoop 中 addTaskWakesUp 是 true
NioEventLoop.run()中的循环何时才会进入 SelectStrategy.SELECT 分支
前面提到DefaultSelectStrategy.calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)在这里就是用于获取循环需要进入哪个分支的判断依据,如果任务队列中没有任务就会进入 SelectStrategy.SELECT 分支,如果任务队列中有任务就会调用 selectSupplier.get()(这里 selectSupplier 是在 NioEventLoop 的匿名实现),最终调用的的是 Selector.selectNow()查看当 selector 上是否有 IO 事件,有则一并返回