OUZHANBO

对于我这种菜鸡来说,毕业等于失业

0%

Netty启动源码分析

Netty 启动源码分析

ServerBootstrap 的启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) {

NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
System.out.println(ch);
}
})
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
//主要关注的是bind方法的底层逻辑
ChannelFuture sync = serverBootstrap.bind(8000).sync();
sync.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("operationComplete");
}
});
sync.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

bind 方法进入调试最终会进入到 AbstractBootstrap 里的doBind方法,其中有两个点:

  • AbstractBootstrap.initAndRegister:初始化 ServerSocketChannel 并将 ServerSocketChannel 注册到 selector 的方法

  • AbstractBootstrap.doBind0:ServerSocketChannel 绑定端口号的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private ChannelFuture doBind(final SocketAddress localAddress) {
//重点关注点
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
//重点关注点
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

AbstractBootstrap.initAndRegister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

首先会调用 ChannelFactory 工厂类的方法得到一个 Channel,相当于 NIO 中的:

1
ServerSocketChannel ssc = ServerSocketChannel.open();

然后进入 init 方法,关键点在于利用刚刚得到的 channel 对象,创建了一个 pipeline,并且添加了一个 ChannelInitializer 处理器,监听初始化事件,在初始化事件中,使用 eventLoop 所在的 NIO 线程,提交一个任务,向 pipeline 中新增一个 ServerBootstrapAcceptor 用于处理新连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

外层的p.addLast(new ChannelInitializer<Channel>()添加进去的 ChannelInitializer 中的 initChannel 方法要在 AbstractUnsafe 的 register0 中 pipeline.invokeHandlerAddedIfNeeded();方法调用时被执行

扩展:里面的pipeline.addLast(handler) 添加的就是我们自己配置的 ServerBootstrap.handler 里面的 ChannelInitializer,但是这里的 initChannel 方法就直接执行,因为 DefaultChannelPipeline 中的 registered 已经变为了 true,这个值也是因为前面调用了 invokeHandlerAddedIfNeeded 最终在 DefaultChannelPipeline 中的 callHandlerAddedForAllHandlers 中变为了 true

之后config().group().register(channel)这段代码的的调用流程图大致如下:

图片丢失

AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)的主要是使用 eventLoop.inEventLoop()判断当前线程是否是 eventLoop 的线程,如果在当前线程直接执行 register0(promise),否则交给 eventLoop 起一个新的线程执行

在这里现然不是,因为从上面的流程图可以看出创建新的线程是在 SingleThreadEventExecutor.doStartThread()方法里面实现的,具体是在 ThreadPerTaskExecutor.execute(Runnable command)中实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

图片丢失

图片丢失

SingleThreadEventExecutor.execute(Runnable task, boolean immediate)主要将任务添加到 taskQueue 中,并且判断当前线程是否是 eventLoop 的线程,如果不是就开启一个新的线程去执行 taskQueue 中的任务,这里添加到 taskQueue 中的任务是在 AbstractUnsafe.register 方法中匿名实现的,执行 register0(promise)

图片丢失

图片丢失

之后就是就是在 eventLoop 中线程的执行的流程逻辑了,执行逻辑的流程如下

图片丢失

在 SingleThreadEventExecutor.doStartThread()启动一个新线程并将线程赋值给变量 thread,在这之后在该线程内 eventLoop.inEventLoop()返回的都是 true 了

SingleThreadEventExecutor.runAllTasks(long timeoutNanos)将 taskQueue 的任务取出来并执行

图片丢失

之后就是 register0(ChannelPromise promise)的执行流程

图片丢失

  • AbstractNioChannel.doRegister()将 NIO 的 ServerSocketChannel 注册到 Selector 中
  • DefaultChannelPipeline.invokeHandlerAddedIfNeeded() 执行 pendingHandlerCallbackHead 中的任务
  • AbstractUnsafe.safeSetSuccess(ChannelPromise promise) 唤醒因为 ChannelPromise(继承了 ChannelFuture)等待的线程,并且执行相应的 listeners
AbstractNioChannel.doRegister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

执行selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);将 NIO 的 ServerSocketChannel 注册到 Selector 中,并且将当前的 NioServerSocketChannel 作为 attachment 加入到 NIO 的 SelectionKey 中,相当于 NIO 里面的:

1
SelectionKey sscKey = ssc.register(selector, 0, attach);
DefaultChannelPipeline.invokeHandlerAddedIfNeeded

DefaultChannelPipeline.invokeHandlerAddedIfNeeded() 会在 callHandlerAddedForAllHandlers()执行 pendingHandlerCallbackHead 中的任务

1
2
3
4
5
6
7
8
9
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;

// This Channel itself was registered.
registered = true;

pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}

// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}

在这里 pendingHandlerCallbackHead 中就有一个在前面 ServerBootstrap.init(Channel channel)中通过p.addLast(new ChannelInitializer<Channel>()添加进去的 PendingHandlerAddedTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

PendingHandlerAddedTask.execute()的执行流程如下

图片丢失

最终调用的是 ServerBootstrap 中匿名实现的 ChannelInitializer.initChannel(C ch),到了这里回到前面说的p.addLast(new ChannelInitializer<Channel>()添加进去的 ChannelInitializer,执行它的 initChannel(final Channel ch)将我们在 ServerBootstrap.handler 里面的 ChannelInitializer 添加到 DefaultChannelPipeline 中的 AbstractChannelHandlerContext 的调用链中,但是和前面 addLast 不同的是这里因为在 DefaultChannelPipeline.callHandlerAddedForAllHandlers()已经将 registered 变为了 true,直接就调用 callHandlerAdded0(final AbstractChannelHandlerContext ctx),后面的逻辑就和上面说的一样

在 ChannelInitializer.initChannel(ChannelHandlerContext ctx)中执行完 initChannel(C ch)后会调用 pipeline.remove(this),最终在 DefaultChannelPipeline.atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx)将 ChannelInitializer 从 DefaultChannelPipeline 中 AbstractChannelHandlerContext 调用链中移除

1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
1
2
3
4
5
6
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
AbstractUnsafe.safeSetSuccess

图片丢失

RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)是使用 AtomicReferenceFieldUpdater.compareAndSet(T obj, V expect, V update)将 DefaultPromise 的 result 设置为 DefaultPromise.SUCCESS(因为 setValue0(result == null ? SUCCESS : result))

DefaultPromise.checkNotifyWaiters()判断 waiters 的数量是否大于 0,如果大于 0 就唤醒等待的线程(ChannelPromise.sync()中会调用 ChannelPromise.ChannelPromise 增加 waiters 和调用 Object.wait()等待唤醒)并且有 listeners 的话会返回 true,之后就会调用对应的 GenericFutureListener

图片丢失

DefaultPromise.notifyListeners()最终会调用 DefaultChannelPromise.addListener(GenericFutureListener<? extends Future<? super Void>> listener)添加进去的 GenericFutureListener

图片丢失

需要说明一下的是 DefaultPromise.notifyListenersNow()的代码都会 NIO 的线程里面执行,因为其判断逻辑是如果当前线程是 NIO 线程就直接用当前线程执行,如果不是就将其作为任务加入到 taskQueue 中等待执行

图片丢失

AbstractBootstrap.doBind0

图片丢失

inbound 事件会从 head 开始往后找到最后一个 inboundHandler 并执行事件相关方法,outbound 事件会从 tail 开始往前找到最后一个 outboundHandler 并执行事件相关方法

图片丢失

根据前面 outbound 事件的调用链逻辑发现最终会执行到 HeadContext.bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise),继续追下去找到 AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise),里面有两个需要注意的点

  • NioServerSocketChannel.doBind(SocketAddress localAddress) 将 ServerSocketChannel 绑定到具体某个端口
  • DefaultChannelPipeline.fireChannelActive()触发 inbound 事件调用对应 inboundHandler 对应的 channelActive(ChannelHandlerContext ctx)方法
  • AbstractUnsafe.safeSetSuccess(ChannelPromise promise) 唤醒因为 ChannelPromise(继承了 ChannelFuture)等待的线程,并且执行相应的 listeners
NioServerSocketChannel.doBind

图片丢失

这里用的是 java8 所以会执行到 javaChannel().bind(localAddress, config.getBacklosg()),这里这些的代码就是 NIO 里面的:

1
ssc.bind(new InetSocketAddress(8080));
DefaultChannelPipeline.fireChannelActive

图片丢失s

!wasActive && isActive()这段代码看出 DefaultChannelPipeline.fireChannelActive()这个方法要在 channel 第一次 active 的情况下才会执行,isActive()最终就是判断 ServerSocketChannel 的 open 是否为 true 并且 ServerSocketChannel 对应的 ServerSocketAdaptor.isBound()是否为 true,这里 open 默认为 true,区别是两次 isActive()的调用分别在 javaChannel().bind(localAddress, config.getBacklosg())的前后

图片丢失

图片丢失

图片丢失

不过这里不是直接调用而是通过 AbstractUnsafe.invokeLater(Runnable task),将任务加入到 taskQueue,之后在 NIO 线程中 SingleThreadEventExecutor.runAllTasks 执行

DefaultChannelPipeline.fireChannelActive()的执行逻辑就是前面说的 inbound 事件调用链,执行所有 inboundHandler 对应的 channelActive(ChannelHandlerContext ctx)方法

图片丢失

不过需要注意的是 HeadContext 的 channelActive(ChannelHandlerContext ctx)中会调用 HeadContext.readIfIsAutoRead(),最终会调用到 AbstractNioChannel.doBeginRead(),其中selectionKey.interestOps(interestOps | readInterestOp)(这里的 readInterestOp 就是 SelectionKey.OP_ACCEPT)相当于 NIO 中的:

1
sscKey.interestOps(SelectionKey.OP_ACCEPT);

图片丢失

图片丢失

图片丢失

AbstractUnsafe.safeSetSuccess

前面已经介绍了不再赘述

ServerBootstrap 启动中一些细节问题

EventLoop 何时被创建

在 NioEventLoopGroup(int nThreads) 构造器一路追踪,最后 MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object… args) 中调用 NioEventLoopGroup.newChild(Executor executor, Object… args)创建 EventLoop

图片丢失

在中间省略的过程中指定了一些 NioEventLoopGroup 和 NioEventLoop 的一些属性值,大致如下:

  • selectorProvider 是 NIO 中 Selector 的提供者在Selector.open()ServerSocketChannel.open()中都有用到,默认是通过SelectorProvider.provider()获取
  • nThreads 是指定 NioEventLoopGroup 中有多少个 NioEventLoop,如果没有指定默认是使用MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS,这个默认是当前系统可使用的处理器数量乘以 2
  • executor 每个 NioEventLoop 在 doStartThread()中创建线程的时候使用的 Executor,默认是ThreadExecutorMap.apply(final Executor executor, final EventExecutor evntExecutor)的匿名实现,其中参数中的 executor 默认是 ThreadPerTaskExecutor
  • rejectedExecutionHandler 是 NioEventLoop 中待执行任务 taskQueue 中任务超过一定数量之后的决策策略
  • maxPendingTasks 指定 taskQueue 中最多可以存放的待执行任务数,通过设置SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS指定,默认是 16
  • selectStrategyFactory 默认是 DefaultSelectStrategyFactory,构建出的 DefaultSelectStrategy 中的calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)会在 NioEventLoop 中的 run()方法中用到

提交普通任务时是否会结束 select 阻塞

select 方法默认阻塞的时间是获取定时任务队列 AbstractScheduledEventExecutor.scheduledTaskQueue 中的第一个任务的 deadlineNanos,如果定时任务队列中没有任务那么 curDeadlineNanos 就是 NioEventLoop.NONE,最终在 select 方法中会一直阻塞,直到被唤醒

图片丢失

前面config().group().register(channel)调用流程那张图中的 SingleThreadEventExecutor.execute(Runnable task)就是提交提交任务的方法,该法会继续调用 SingleThreadEventExecutor.execute(Runnable task, boolean immediate) ,其中 immediate 主要取决于任务是否不是 LazyRunnable,后面的 wakesUpForTask(task)在 SingleThreadEventExecutor 固定为 true

图片丢失

SingleThreadEventExecutor.execute(Runnable task, boolean immediate)中的 addTaskWakesUp 在 NioEventLoop 中默认就是 false,所以其实主要还是取决于前面说的 immediate 这个值

图片丢失

只有不是当前 NIO 线程并且 nextWakeupNanos 的值不是 AWAKE,才会调用 selector.wakeup()唤醒,所以多个线程多次调用 NioEventLoop.wakeup(boolean inEventLoop)下只有第一个线程会起到唤醒的效果,其他线程的调用都是无效操作浪费性能

图片丢失

扩展说下 addTaskWakesUp 的作用就是用于判断是否只要添加了任务就可以直接被唤醒,在 NioEventLoop 中它默认为 false 是因为这里线程的阻塞是因为调用了 Selector 的 select 方法,而 DefaultEventLoop 是任务队列中没有任务所以阻塞(在 SingleThreadEventExecutor.takeTask()中执行了 taskQueue.take()阻塞),只需要提交任务进去就会唤醒,所以 DefaultEventLoop 中 addTaskWakesUp 是 true

图片丢失

图片丢失

NioEventLoop.run()中的循环何时才会进入 SelectStrategy.SELECT 分支

前面提到DefaultSelectStrategy.calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)在这里就是用于获取循环需要进入哪个分支的判断依据,如果任务队列中没有任务就会进入 SelectStrategy.SELECT 分支,如果任务队列中有任务就会调用 selectSupplier.get()(这里 selectSupplier 是在 NioEventLoop 的匿名实现),最终调用的的是 Selector.selectNow()查看当 selector 上是否有 IO 事件,有则一并返回

图片丢失

图片丢失