OUZHANBO

对于我这种菜鸡来说,毕业等于失业

0%

Netty启动源码分析

Netty 启动源码分析

ServerBootstrap 的启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) {

NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
System.out.println(ch);
}
})
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
//主要关注的bind方法的底层逻辑
ChannelFuture sync = serverBootstrap.bind(8000).sync();
sync.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("operationComplete");
}
});
sync.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

bind 方法进入调试最终会进入到 AbstractBootstrap 里的doBind方法,其中有两个点:

  • AbstractBootstrap.initAndRegister:初始化 ServerSocketChannel 并将 ServerSocketChannel 注册到 selector 的方法

  • AbstractBootstrap.doBind0:ServerSocketChannel 绑定端口号的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private ChannelFuture doBind(final SocketAddress localAddress) {
//重点关注点
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
//重点关注点
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

AbstractBootstrap.initAndRegister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}

首先会调用 ChannelFactory 工厂类的方法得到一个 Channel,相当于 NIO 中的:

1
ServerSocketChannel ssc = ServerSocketChannel.open();

然后进入 init 方法,关键点在于利用刚刚得到的 channel 对象,创建了一个 pipeline,并且添加了一个 ChannelInitializer 处理器,监听初始化事件,在初始化事件中,使用 eventLoop 所在的 NIO 线程,提交一个任务,向 pipeline 中新增一个 ServerBootstrapAcceptor 用于处理新连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

外层的p.addLast(new ChannelInitializer<Channel>()添加进去的 ChannelInitializer 中的 initChannel 方法要在 AbstractUnsafe 的 register0 中 pipeline.invokeHandlerAddedIfNeeded() 方法调用时被执行

扩展:里面的pipeline.addLast(handler) 添加的就是我们自己配置的 ServerBootstrap.handler 里面的 ChannelInitializer,但是这里的 initChannel 方法就直接执行,因为 DefaultChannelPipeline 中的 registered 已经变为了 true,这个值也是因为前面调用了 invokeHandlerAddedIfNeeded 最终在 DefaultChannelPipeline 中的 callHandlerAddedForAllHandlers 中变为了 true

之后config().group().register(channel)这段代码的的调用流程图大致如下:

图片丢失

AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)的主要是使用 eventLoop.inEventLoop()判断当前线程是否是 eventLoop 的线程,如果在当前线程直接执行 register0(promise),否则交给 eventLoop 起一个新的线程执行

在这里现然不是,因为从上面的流程图可以看出创建新的线程是在 SingleThreadEventExecutor.doStartThread()方法里面实现的,具体是在 ThreadPerTaskExecutor.execute(Runnable command)中实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

图片丢失

图片丢失

SingleThreadEventExecutor.execute(Runnable task, boolean immediate)主要将任务添加到 taskQueue 中,并且判断当前线程是否是 eventLoop 的线程,如果不是就开启一个新的线程去执行 taskQueue 中的任务,这里添加到 taskQueue 中的任务是在 AbstractUnsafe.register 方法中匿名实现的,执行 register0(promise)

图片丢失

图片丢失

之后就是就是在 eventLoop 中线程的执行的流程逻辑了,执行逻辑的流程如下

图片丢失

在 SingleThreadEventExecutor.doStartThread()启动一个新线程并将线程赋值给变量 thread,在这之后在该线程内 eventLoop.inEventLoop()返回的都是 true 了

SingleThreadEventExecutor.runAllTasks(long timeoutNanos)将 taskQueue 的任务取出来并执行

图片丢失

之后就是 register0(ChannelPromise promise)的执行流程

图片丢失

  • AbstractNioChannel.doRegister()将 NIO 的 ServerSocketChannel 注册到 Selector 中
  • DefaultChannelPipeline.invokeHandlerAddedIfNeeded() 执行 pendingHandlerCallbackHead 中的任务
  • AbstractUnsafe.safeSetSuccess(ChannelPromise promise) 唤醒因为 ChannelPromise(继承了 ChannelFuture)等待的线程,并且执行相应的 listeners
AbstractNioChannel.doRegister
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

执行selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);将 NIO 的 ServerSocketChannel 注册到 Selector 中,并且将当前的 NioServerSocketChannel 作为 attachment 加入到 NIO 的 SelectionKey 中,相当于 NIO 里面的:

1
SelectionKey sscKey = ssc.register(selector, 0, attach);
DefaultChannelPipeline.invokeHandlerAddedIfNeeded

DefaultChannelPipeline.invokeHandlerAddedIfNeeded() 会在 callHandlerAddedForAllHandlers()执行 pendingHandlerCallbackHead 中的任务

1
2
3
4
5
6
7
8
9
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;

// This Channel itself was registered.
registered = true;

pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}

// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}

在这里 pendingHandlerCallbackHead 中就有一个在前面 ServerBootstrap.init(Channel channel)中通过p.addLast(new ChannelInitializer<Channel>()添加进去的 PendingHandlerAddedTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

Last0(newCtx);

// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

PendingHandlerAddedTask.execute()的执行流程如下

图片丢失

最终调用的是 ServerBootstrap 中匿名实现的 ChannelInitializer.initChannel(C ch),到了这里回到前面说的p.addLast(new ChannelInitializer<Channel>()添加进去的 ChannelInitializer,执行它的 initChannel(final Channel ch)将我们在 ServerBootstrap.handler 里面的 ChannelInitializer 添加到 DefaultChannelPipeline 中的 AbstractChannelHandlerContext 的调用链中,但是和前面 addLast 不同的是这里因为在 DefaultChannelPipeline.callHandlerAddedForAllHandlers()已经将 registered 变为了 true,直接就调用 callHandlerAdded0(final AbstractChannelHandlerContext ctx),后面的逻辑就和上面说的一样

在 ChannelInitializer.initChannel(ChannelHandlerContext ctx)中执行完 initChannel(C ch)后会调用 pipeline.remove(this),最终在 DefaultChannelPipeline.atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx)将 ChannelInitializer 从 DefaultChannelPipeline 中 AbstractChannelHandlerContext 调用链中移除

1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
1
2
3
4
5
6
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
AbstractUnsafe.safeSetSuccess

图片丢失

RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)是使用 AtomicReferenceFieldUpdater.compareAndSet(T obj, V expect, V update)将 DefaultPromise 的 result 设置为 DefaultPromise.SUCCESS(因为 setValue0(result == null ? SUCCESS : result))

DefaultPromise.checkNotifyWaiters()判断 waiters 的数量是否大于 0,如果大于 0 就唤醒等待的线程(ChannelPromise.sync()中会调用 ChannelPromise.ChannelPromise 增加 waiters 和调用 Object.wait()等待唤醒)并且有 listeners 的话会返回 true,之后就会调用对应的 GenericFutureListener

图片丢失

DefaultPromise.notifyListeners()最终会调用 DefaultChannelPromise.addListener(GenericFutureListener<? extends Future<? super Void>> listener)添加进去的 GenericFutureListener

图片丢失

需要说明一下的是 DefaultPromise.notifyListenersNow()的代码都会 NIO 的线程里面执行,因为其判断逻辑是如果当前线程是 NIO 线程就直接用当前线程执行,如果不是就将其作为任务加入到 taskQueue 中等待执行

图片丢失

AbstractBootstrap.doBind0

图片丢失

inbound 事件会从 head 开始往后找到最后一个 inboundHandler 并执行事件相关方法,outbound 事件会从 tail 开始往前找到最后一个 outboundHandler 并执行事件相关方法

图片丢失

根据前面 outbound 事件的调用链逻辑发现最终会执行到 HeadContext.bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise),继续追下去找到 AbstractUnsafe.bind(final SocketAddress localAddress, final ChannelPromise promise),里面有两个需要注意的点

  • NioServerSocketChannel.doBind(SocketAddress localAddress) 将 ServerSocketChannel 绑定到具体某个端口
  • DefaultChannelPipeline.fireChannelActive()触发 inbound 事件调用对应 inboundHandler 对应的 channelActive(ChannelHandlerContext ctx)方法
  • AbstractUnsafe.safeSetSuccess(ChannelPromise promise) 唤醒因为 ChannelPromise(继承了 ChannelFuture)等待的线程,并且执行相应的 listeners
NioServerSocketChannel.doBind

图片丢失

这里用的是 java8 所以会执行到 javaChannel().bind(localAddress, config.getBacklosg()),这里这些的代码就是 NIO 里面的:

1
ssc.bind(new InetSocketAddress(8080));
DefaultChannelPipeline.fireChannelActive

图片丢失

!wasActive && isActive()这段代码看出 DefaultChannelPipeline.fireChannelActive()这个方法要在 channel 第一次 active 的情况下才会执行,isActive()最终就是判断 ServerSocketChannel 的 open 是否为 true 并且 ServerSocketChannel 对应的 ServerSocketAdaptor.isBound()是否为 true,这里 open 默认为 true,区别是两次 isActive()的调用分别在 javaChannel().bind(localAddress, config.getBacklosg())的前后

图片丢失

图片丢失

图片丢失

不过这里不是直接调用而是通过 AbstractUnsafe.invokeLater(Runnable task),将任务加入到 taskQueue,之后在 NIO 线程中 SingleThreadEventExecutor.runAllTasks 执行

DefaultChannelPipeline.fireChannelActive()的执行逻辑就是前面说的 inbound 事件调用链,执行所有 inboundHandler 对应的 channelActive(ChannelHandlerContext ctx)方法

图片丢失

不过需要注意的是 HeadContext 的 channelActive(ChannelHandlerContext ctx)中会调用 HeadContext.readIfIsAutoRead(),最终会调用到 AbstractNioChannel.doBeginRead(),其中selectionKey.interestOps(interestOps | readInterestOp)(这里的 readInterestOp 就是 SelectionKey.OP_ACCEPT)相当于 NIO 中的:

1
sscKey.interestOps(SelectionKey.OP_ACCEPT);

图片丢失

图片丢失

图片丢失

AbstractUnsafe.safeSetSuccess

前面已经介绍了不再赘述

ServerBootstrap 启动中一些细节问题

EventLoop 何时被创建

在 NioEventLoopGroup(int nThreads) 构造器一路追踪,最后 MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object… args) 中调用 NioEventLoopGroup.newChild(Executor executor, Object… args)创建 EventLoop

图片丢失

在中间省略的过程中指定了一些 NioEventLoopGroup 和 NioEventLoop 的一些属性值,大致如下:

  • selectorProvider 是 NIO 中 Selector 的提供者在Selector.open()ServerSocketChannel.open()中都有用到,默认是通过SelectorProvider.provider()获取
  • nThreads 是指定 NioEventLoopGroup 中有多少个 NioEventLoop,如果没有指定默认是使用MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS,这个默认是当前系统可使用的处理器数量乘以 2
  • executor 每个 NioEventLoop 在 doStartThread()中创建线程的时候使用的 Executor,默认是ThreadExecutorMap.apply(final Executor executor, final EventExecutor evntExecutor)的匿名实现,其中参数中的 executor 默认是 ThreadPerTaskExecutor
  • rejectedExecutionHandler 是 NioEventLoop 中待执行任务 taskQueue 中任务超过一定数量之后的决策策略
  • maxPendingTasks 指定 taskQueue 中最多可以存放的待执行任务数,通过设置SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS指定,默认是 16
  • selectStrategyFactory 默认是 DefaultSelectStrategyFactory,构建出的 DefaultSelectStrategy 中的calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)会在 NioEventLoop 中的 run()方法中用到

提交普通任务时是否会结束 select 阻塞

select 方法默认阻塞的时间是获取定时任务队列 AbstractScheduledEventExecutor.scheduledTaskQueue 中的第一个任务的 deadlineNanos,如果定时任务队列中没有任务那么 curDeadlineNanos 就是 NioEventLoop.NONE,最终在 select 方法中会一直阻塞,直到被唤醒

图片丢失

前面config().group().register(channel)调用流程那张图中的 SingleThreadEventExecutor.execute(Runnable task)就是提交提交任务的方法,该法会继续调用 SingleThreadEventExecutor.execute(Runnable task, boolean immediate) ,其中 immediate 主要取决于任务是否不是 LazyRunnable,后面的 wakesUpForTask(task)在 SingleThreadEventExecutor 固定为 true

图片丢失

SingleThreadEventExecutor.execute(Runnable task, boolean immediate)中的 addTaskWakesUp 在 NioEventLoop 中默认就是 false,所以其实主要还是取决于前面说的 immediate 这个值

图片丢失

只有不是当前 NIO 线程并且 nextWakeupNanos 的值不是 AWAKE,才会调用 selector.wakeup()唤醒,所以多个线程多次调用 NioEventLoop.wakeup(boolean inEventLoop)下只有第一个线程会起到唤醒的效果,其他线程的调用都是无效操作浪费性能

图片丢失

扩展说下 addTaskWakesUp 的作用就是用于判断是否只要添加了任务就可以直接被唤醒,在 NioEventLoop 中它默认为 false 是因为这里线程的阻塞是因为调用了 Selector 的 select 方法,而 DefaultEventLoop 是任务队列中没有任务所以阻塞(在 SingleThreadEventExecutor.takeTask()中执行了 taskQueue.take()阻塞),只需要提交任务进去就会唤醒,所以 DefaultEventLoop 中 addTaskWakesUp 是 true

图片丢失

图片丢失

NioEventLoop.run()中的循环何时才会进入 SelectStrategy.SELECT 分支

前面提到DefaultSelectStrategy.calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)在这里就是用于获取循环需要进入哪个分支的判断依据,如果任务队列中没有任务就会进入 SelectStrategy.SELECT 分支,如果任务队列中有任务就会调用 selectSupplier.get()(这里 selectSupplier 是在 NioEventLoop 的匿名实现),最终调用的的是 Selector.selectNow()查看当 selector 上是否有 IO 事件,有则一并返回

图片丢失

图片丢失

图片丢失

图片丢失

ioRatio 的作用

ioTime * (100 - ioRatio) / ioRatio) 这段代码看出 ioRatio 是用于决定 IO 事件和普通事件的处理时间比,例如现在 IO 事件执行的时间为 4 分钟,ioRatio 是 20(默认是 50),那么执行普通事件需要可以用的执行时间就是4 * 4 = 16 分钟 ,如果 ioRatio 是 100 默认会执行所有的普通事件,如果 ioRatio 不是 100 并且没有没有 IO 事件(前面说过 strategy 要大于 0 才是有 IO 事件)就runAllTasks(0),执行最少的普通事件

图片丢失

图片丢失

NioEventLoop.processSelectedKeys()的优化处理

图片丢失

NioEventLoop.processSelectedKeysOptimized()和 NioEventLoop.processSelectedKeysPlain(Set<SelectionKey>selectedKeys)主要是 selectedKeys 的不同,前者对 selectedKeys 进行了优化,selectedKeys 使用的是 SelectedSelectionKeySet(继承了 AbstractSet<SelectionKey>),底层是一个 SelectionKey 数组,提高了遍历的速度,后者是 SelectorImpl 中的 publicSelectedKeys

图片丢失

图片丢失

图片丢失

final Object a = k.attachment();获取到的是在前面

而这里 SelectorImpl 中的 selectedKeys 和 publicSelectedKeys 成员变量都通过反射替换成了 SelectedSelectionKeySet,具体是通过 NioEventLoop 的构造方法中调用了SelectorTuple selectorTuple = openSelector(),最终在 NioEventLoop.openSelector()反射赋值的

图片丢失

扩展说下 SelectorImpl 中的 keys、selectedKeys、publicKeys、publicSelectedKeys

  1. keys

    • 类型:HashSet<SelectionKey>
    • 作用:存储所有注册到该选择器上的 SelectionKey 对象
    • 说明:当通过 Selector.register() 方法注册一个通道(Channel)时,会生成一个 SelectionKey 并将其添加到 keys 集合中,这个集合包含了所有与选择器关联的通道的 SelectionKey
  2. selectedKeys

    • 类型:HashSet<SelectionKey>
    • 作用:存储当前已准备好进行 I/O 操作的 SelectionKey 对象
    • 说明:当调用 Selector.select() 方法时,选择器会检查所有注册的通道,并将已准备好进行 I/O 操作的通道对应的 SelectionKey 添加到 selectedKeys 集合中,这个集合用于表示当前有哪些通道可以进行 I/O 操作
  3. publicSelectedKeys

    • 类型:Set<SelectionKey>
    • 作用:存储当前已准备好进行 I/O 操作的 SelectionKey 对象
    • 说明:publicKeys 是 keys 集合的不可修改视图,通过 Selector.keys() 方法返回,外部代码可以通过 publicKeys 查看所有注册的 SelectionKey,但不能修改它
  4. publicSelectedKeys

    • 类型:Set<SelectionKey>
    • 作用:对外暴露的 selectedKeys 集合的视图
    • 说明:ublicSelectedKeys 是 selectedKeys 集合的视图,通过 Selector.selectedKeys() 方法返回,外部代码可以通过 publicSelectedKeys 查看当前已准备好进行 I/O 操作的 SelectionKey,并可以从中移除已处理的 SelectionKey,但是不能添加

图片丢失

图片丢失

图片丢失

处理 IO 事件

processSelectedKey(SelectionKey k, AbstractNioChannel ch)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}

try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

其中readyOps = k.readyOps()已经就绪的事件,(readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0检查其中是否有 SelectionKey.OP_READ 事件和 SelectionKey.OP_ACCEPT 事件, 因为是服务端 ServerSocketChannel 只对 SelectionKey.OP_ACCEPT 类型的事件感兴趣,前面selectionKey.interestOps(interestOps | readInterestOp)中设置的

扩展:在 Java NIO 中,readyOps() 返回的是通道当前 已就绪的事件集合,而 interestOps() 返回的是通道 注册的感兴趣的事件集合。如果 interestOps 中没有注册某个事件,那么 readyOps 中 不会 返回该事件。

  • interestOps:表示你希望监听哪些事件。

  • readyOps:表示哪些事件已经就绪。

  • 关系:readyOps 只会返回 interestOps 中注册的事件。如果某个事件没有在 interestOps 中注册,即使它已经就绪,readyOps 也不会返回它。

NioMessageUnsafe.read()

前面通过final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe()获取到的 unsafe 的实现是 NioMessageUnsafe,因为 NioServerSocketChannel 继承了 AbstractNioMessageChannel,而在通过 ReflectiveChannelFactory 反射创建 NioServerSocketChannel 的调用过程中父类构造 AbstractChannel(Channel parent)中unsafe = newUnsafe()调用了 AbstractNioMessageChannel 中的 newUnsafe()方法返回一个 NioMessageUnsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//获取到的是AdaptiveRecvByteBufAllocator中的HandleImpl
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
//设置allocHandle中maxMessagePerRead为16,totalMessages和totalBytesRead为0
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
//获取NioSocketChannel并且加入到readBuf中
int localRead = doReadMessages(readBuf);
//获取不到NioSocketChannel退出循环
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//让totalMessages加1
allocHandle.incMessagesRead(localRead);
//根据totalMessages和maxMessagePerRead和判断是否需要继续执行
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//调用fireChannelRead,对获取到的NioSocketChannel进行处理
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
//原本以为totalBytesRead会在里面更新,然而并没有
allocHandle.readComplete();
//和前面的fireChannelActive类似,在HeadContext中都会readIfIsAutoRead()方法
pipeline.fireChannelReadComplete();

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();获取到的是 AdaptiveRecvByteBufAllocator 中的 HandleImpl,unsfae()和前面获取到的一样也是 NioMessageUnsafe,recvBufAllocHandle()的逻辑如下:

1
2
3
4
5
6
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}

config()中的得到的是 NioServerSocketChannelConfig,在NioServerSocketChannel(ServerSocketChannel channel)的构造方法中指定的,在父类构造方法DefaultChannelConfig(Channel channel)中创建了 AdaptiveRecvByteBufAllocator 的对象之后在DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator)通过setRecvByteBufAllocator(allocator, channel.metadata());对 DefaultChannelConfig.rcvBufAllocator 赋值,其中还会调用((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());设置 DefaultMaxMessagesRecvByteBufAllocator.maxMessagesPerRead 的值为 16(后续continueReading(allocHandle)中会用到),这里获取到的 metadata 是 NioServerSocketChannel 中的 METADATA 默认值是new ChannelMetadata(false, 16);,最终得到的的就是 AdaptiveRecvByteBufAllocator 中的 HandleImp

allocHandle.reset(config);方法中设置 MaxMessageHandle 中的 maxMessagePerRead、totalMessages、totalBytesRead,其中maxMessagesPerRead()获取到的就是 DefaultMaxMessagesRecvByteBufAllocator.maxMessagesPerRead

1
2
3
4
5
6
@Override
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}

int localRead = doReadMessages(readBuf);调用的是 NioServerSocketChannel 的实现,通过调用SocketUtils.accept(javaChannel());获取到 nio 的 SocketChannel,之后就是将获取到的 SocketChannel 用于构建 netty 的 NioSocketChannel,并将构建后的 NioSocketChannel 放入到 buf 中,提供后续使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected int doReadMessages(List<Object> buf) throws Exception {
//其中调用ServerSocketChannel.accepet()
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
//将nio的SocketChannel用于构建netty的NioSocketChannel,并加入到buf中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

在 NioSocketChannel 的父类 AbstractNioByteChannel 构造方法AbstractNioByteChannel(Channel parent, SelectableChannel ch)指定了 readInterestOp 为 SelectionKey.OP_READ

allocHandle.incMessagesRead(localRead);让 allocHandle 中的 totalMessages 加 1,因为 localRead 为 1

1
2
3
4
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}

continueReading(allocHandle)最终会调用MaxMessageHandle.continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier),在其中主根据totalMessages < maxMessagePerReadtotalBytesRead > 0决定是是否要继续执行循环,这里就用到前面说的 maxMessagePerRead、totalMessages、totalBytesRead,不过这里也不是很明白为什么 totalBytesRead 没有增加一直都是 0,本以为 maxMessagePerRead 为 16 可以最多一次可以获取 16 个 NioSocketChannel

1
2
3
4
5
6
7
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}

pipeline.fireChannelRead(readBuf.get(i));是一个 inbound 事件,最终会调用前面说到的 ServerBootstrapAcceptor 中的 channelRead(ChannelHandlerContext ctx, Object msg)方法,和前面 config().group().register(channel)的执行逻辑一样,只是这里用的是 childGroup,所以不在赘述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//childHandler就是前面main方法中通过childHandler方法指定的
child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
//这里和前面config().group().register(channel)的执行逻辑一样,只是这里用的是childGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

allocHandle.readComplete();中会调用 MaxMessageHandle.totalBytesRead 方法,因为 totalBytesRead 一直是 0 所以保持不变

1
2
3
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}

pipeline.fireChannelReadComplete();的逻辑和pipeline.fireChannelActive()类似,只是里面触发的是 ChannelInboundHandler.channelReadComplete 方法,在其中的 HeadContexts 中也会调用 readIfIsAutoRead()方法,只不过在 AbstractNioChannel.doBeginRead()方法中if ((interestOps & readInterestOp) == 0)不再是 true,因为前面 fireChannelActive 已经执行过了

至此 ServerBootstrap 和 NioServerSocketChannel 相关的主要处理逻辑都已经过粗略的看了一遍,更多的细节可以自行阅读源码