学习链接
- Netty核心技术十–Netty 核心源码剖析
- Netty核心技术九–TCP 粘包和拆包及解决方案
- Netty核心技术七–Google Protobuf
- Netty核心技术六–Netty核心模块组件
- Netty核心技术五–Netty高性能架构设计
- Netty源码分析 (一)----- NioEventLoopGroup
- Netty源码分析 (二)----- ServerBootstrap
- Netty源码分析 (三)----- 服务端启动源码分析
- Netty源码分析 (四)----- ChannelPipeline
- Netty源码分析 (五)----- 数据如何在 pipeline 中流动
- Netty源码分析 (六)----- 客户端接入accept过程
- Netty源码分析 (七)----- read过程 源码分析
- Netty源码分析 (八)----- write过程 源码分析
- Netty源码分析 (九)----- 拆包器的奥秘
- Netty源码分析 (十)----- 拆包器之LineBasedFrameDecoder
- Netty源码分析 (十一)----- 拆包器之LengthFieldBasedFrameDecoder
- Netty源码分析 (十二)----- 心跳服务之 IdleStateHandler 源码分析
1. 源码分析
1.1 启动剖析
我们就来看看 netty 中对下面的代码是怎样进行处理的
(先明确Java nio的基础步骤如下,而netty在启动过程中,也是需要做下面的事情的)
//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//4 启动 nio boss 线程执行接下来的操作
//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
// (注意,这里将NioServerSocketChannel作为附件绑定到了selectionKey上,当此ServerSocketChannel有可连接事件时,就可以获取到此selectionKey,从而获取到对应的NioServerSocketChannel)
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
// (ServerBootstrapAcceptor是ChannelInboundHandlerAdapter入站类型的处理器)
//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
源码入口,可以从下面的代码进入
(暂时先不看NioEventLoopGroup,而Selector是存在于NioEventLoop中的,所以selector.open暂时不看)
public class TestSourceServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
}
})
.bind(8880); // 以bind为入口
}
}
AbstractBootstrap#doBind
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
(1、注意main线程和nio线程的切换;
2、initAndRegister 对应 nio中 创建ServerSocketChannel 和 把ServerSocketChannel注册到selector上
3、doBind0 对应 nio中 bind监听端口)
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2. 因为 initAndRegister 是异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
// 2.1 如果已经完成(如果前面做的比较快,就进入这个if块)
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 3.1 立刻调用 doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
// 2.2 还没有完成
else {
final PendingRegistrationPromise promise
= new PendingRegistrationPromise(channel);
// 3.2 回调 doBind0
regFuture.addListener(new ChannelFutureListener() {
// (这个operationComplete是由nio线程来调用的)
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 处理异常...
promise.setFailure(cause);
} else {
promise.registered();
// 3. 由注册线程去执行 doBind0
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
AbstractBootstrap#initAndRegister
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//(1、这里会去调用NioServerSocketChannel的无参构造方法去得到channel
// 2、在NioServerSocketChannel的无参构造方法中会去创建javanio的ServerSocketChannel,
// 并且将该ServerSocketChannel维护在NioServerSocketChannel中,
// 并配置为非阻塞模式,感兴趣的事件是OP_ACCEPT,但是还没注册到selector上,
// 只是维护了这些基本信息到NioServerSocketChannel。
// 并且在NioServerSocketChannel的构造方法中会去创建NioServerSocketChannelConfig
// 维护到NioServerSocketChannel中。
// 并且NioServerSocketChannel的构造方法中会去创建DefaultChannelPipeline
// 维护到NioServerSocketChannel中)
channel = channelFactory.newChannel();
// 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
init(channel);
} catch (Throwable t) {
// 处理异常...
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
// (这里会从eventLoopGroup中挑选出1个eventLoop来注册ServerSocketChannel)
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 处理异常...
}
return regFuture;
}
ServerBootstrap#init
关键代码 io.netty.bootstrap.ServerBootstrap#init
(这里会给ServerSocketChannel的pipeline中添加1个ChannelInitializer初始化器,该初始化器只会执行1次,后续将会移除掉。)
// 这里 channel 实际上是 NioServerSocketChannel
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 为 NioServerSocketChannel 的pipeline 添加初始化器!!!
// (1、该初始化器的initChannel方法只会执行1次,后续该初始化器将会移除掉,
// 移除动作是在ChannelInitializer#initChannel中操作的。
// 2、注意该初始化器的initChannel方法在此处尚未被调用。
// 3、initChannel方法的调用时机是在AbstractChannel的register0方法中,
// 在做完将channel注册到selector上之后的
// pipeline.invokeHandlerAddedIfNeeded()这句代码调用的)
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// (这里意味着可以通过配置给config1个handler,
// 从而给serverSocketChannel的pipeline添加1个handler)
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 1、初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
// 2、ServerBootstrapAcceptor的作用是在selector触发可连接事件时,建立连接
// 3、保证添加这个动作是在nio线程中完成的
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup,
currentChildHandler,
currentChildOptions,
currentChildAttrs)
);
}
});
}
});
}
AbstractUnsafe#register
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 一些检查,略...
AbstractChannel.this.eventLoop = eventLoop;
// (判断当前线程是不是eventLoop的线程,因为顺着刚刚的逻辑,当前还在主线程中,所以走else)
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
// 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
// 这行代码完成的事实是 main -> nio boss 线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
// (该方法在nio线程上执行,并且注意promise传进去了,用于通知其它线程)
register0(promise);
}
});
} catch (Throwable t) {
// 日志记录...
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
#####- *AbstractUnsafe#register0
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 1.2.1 【将原生的 nio channel 绑定到 selector 上】,
// 注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
doRegister();
neverRegistered = false;
registered = true;
// 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
//(1、调用到ServerBootstrap的init方法中为NioServerSocketChannel的
// pipeline添加的初始化器的initChannel方法。
// 2、该initChannel方向pipeline中添加了ServerBootstrapAcceptor这个入站处理器)
pipeline.invokeHandlerAddedIfNeeded();
// (给promise对象1个成功的结果,这样前面的监听就能收到这个结果触发operationComplete方法,
// 就会去通知前面在AbstractBootstrap#doBind方法中注册的监听去做doBind0绑定监听端口)
// 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 对应 server socket channel 还未绑定,isActive 为 false
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将java的channel注册到了eventLoop的selector上
// (此时,尚未注册感兴趣的事件。同时,注意当前this作为附件绑定到了selectionKey)
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
- ChannelInitializer#initChannel
关键代码 io.netty.channel.ChannelInitializer#initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 1.2.2.1 执行初始化!!!(调用前面添加的初始化器的initChannel方法)
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 1.2.2.2 移除初始化器!!!(调用完成后,移除初始化器)
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
AbstractBootstrap#doBind0
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
// 3.1 或 3.2 执行 doBind0
private static void doBind0(final ChannelFuture regFuture,
final Channel channel,
final SocketAddress localAddress,
final ChannelPromise promise) {
// 1、确保执行是在nio eventLoop线程中执行
// 2、绑定会从pipe的tail开始找,最终会到headContext中调用到AbstractUnsafe的bind方法
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
AbstractUnsafe#bind
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// 记录日志...
}
boolean wasActive = isActive();
try {
// 3.3 【执行端口绑定】
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 从这里可以看出是绑定端口后,再去触发active事件的
// 当前serverSocketChannel的pipeline已经添加了head-acceptor-tail处理器链,
// 并且已经绑定好端口了,所以这里触发pipeline上所有handler的active事件,
// 接下来,去看HeadContext#channelActive方法
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 3.4 【触发 active 事件】
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
- NioServerSocketChannel#doBind
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用java原生channel的绑定端口的方法
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
- HeadContext#channelActive
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {
// 触发所有handler的active事件
ctx.fireChannelActive();
// 从这里可以看出是所有handler的active触发之后,再将channel注册感兴趣的事件的
// 触发 read ,目的是为了触发channel的事件注册,注册OP_ACCEPT事件,
// 见AbstractNioChannel#doBeginRead
// (注意: NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
readIfIsAutoRead();
}
– AbstractNioChannel#doBeginRead
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// readInterestOp 取值是 16,在NioServerSocketChannel创建时初始化好,代表关注 accept 事件
if ((interestOps & readInterestOp) == 0) {
// 注册感兴趣的事件!!!
selectionKey.interestOps(interestOps | readInterestOp);
}
}
1.2 NioEventLoop 剖析
NioEventLoop的重要组成
1、在NioEventLoop类中有成员变量
private Selector selector;
private Selector unwrappedSelector;
2、在NioEventLoop的父类SingleThreadEventExecutor中有成员变量:
private volatile Thread thread;
// 使用的跟上面同1个thread
private final Executor executor;
// 由于eventLoop是单线程,其它的任务先放在taskQueue任务队列中,然后由单线程依次执行
private final Queue<Runnable> taskQueue;
3、在NioEventLoop的父类的父类AbstractScheduledEventExecutor中有成员变量
// 用来处理定时任务
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务)
selector何时创建
在NioEventLoop的唯一构造方法中,创建了selector
NioEventLoop(NioEventLoopGroup parent,
Executor executor,
SelectorProvider selectorProvider,
SelectStrategy strategy,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false,
newTaskQueue(taskQueueFactory),
newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 在这里创建selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
那为什么会有2个selector呢?
因为netty要把selector里面原来的selectionKey的set实现改为用数组实现,因为数组遍历的性能比set好!
nio线程在何时启动
(当首次调用eventLoop的execute方法时,会启动线程,并且state状态位控制只会启动1次。)
public class TestEventLoop {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
group.next()
// 入口
.execute(() -> {
System.out.println("Hello");
});
}
}
SingleThreadEventExecutor#execute
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判断当前线程是否是eventLoop的thread,很显然,现在eventLoop的thread是null
boolean inEventLoop = inEventLoop();
// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
addTask(task);
if (!inEventLoop) {
// inEventLoop 如果为 false 表示由其它线程来调用 execute,
// 即首次调用时,需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
startThread();
if (isShutdown()) {
// 如果已经 shutdown,做拒绝逻辑,代码略...
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
// 如果线程由于 IO select 阻塞了,添加任务的线程需要负责唤醒 NioEventLoop 线程
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
// 这个executor是在 MultithreadEventExecutorGroup的构造方法中初始化的,
// (直接new的ThreadPerTaskExecutor)
executor.execute(new Runnable() {
@Override
public void run() {
// 将线程池的当前线程保存在成员变量中,以便后续使用
// 将thread线程设置为执行线程
// (所以eventLoopgroup中的executor属性的线程和thread属性是同一个线程)
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
// 【启动 EventLoop 主循环 】
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 清理工作,代码略...
}
}
});
}
@Override
protected void wakeup(boolean inEventLoop) {
// !eventLoop的理解: 只有其它非nio线程提交任务,才会有机会去唤醒selector停止阻塞
// (因为如果是eventLoop自己提交任务给自己,在提交的时候,
// 当前eventLoop正在运行,没有阻塞,所以不需要唤醒selector)
// wakenUp的理解: 当多个其它非nio线程提交任务,那么只会将selector唤醒1次
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
// 唤醒 select 阻塞线程
// (这个wakeup调用后,如果selector正在select,那么直接唤醒,
// 如果selector还没select,那么该selector去select时,就不会阻塞。
// 类似于LockSupport的park和unpark。)
selector.wakeup();
}
}
*NioEventLoop#run
io.netty.channel.nio.NioEventLoop#run
主要任务是执行死循环,不断看有没有新任务,有没有定时任务,有没有 IO 事件,如果有,则执行。
// 死循环执行
for (;;) {
try {
try {
//calculateStrategy 的逻辑如下:
/* [代码] hasTasks ? selectNow() : SelectStrategy.SELECT; */
// 当有任务时, 会执行一次selectNow(),去获取看看是否有io事件,
// 并且会清除上一次的wakeup结果, 无论有没有IO事件,都会跳过switch
// (因为有任务的话,即便没有io事件,也得干活,所以没有必要阻塞了)
// 当没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
// (因为没有任务的话,那就等有io事件了,再干活,所以就设置超时阻塞,
// 同时还要看在阻塞期间有其它非nio线程提交任务,并唤醒selector。
// 那么默认阻塞多久呢?那就需要看NioEventLoop#select(boolean oldWakenUp)方法
// 默认是阻塞1s + 0.5ms,不过还得看有没有定时任务。)
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // -2
continue;
case SelectStrategy.BUSY_WAIT: // -3
case SelectStrategy.SELECT: // -1
// 因为IO线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,
// 因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
// 进行 select 阻塞,并设置唤醒状态为 false
boolean oldWakenUp = wakenUp.getAndSet(false);
// 这里select方法中会调用select(timeoutMillis)阻塞,那么什么时候唤醒呢?
// 当有io事件时自动唤醒
// 或者超时自动唤醒
// 或者有任务提交时,手动唤醒以便及时处理io事件以外的普通任务
// 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
// 下面的 select 方法不会阻塞
// 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
// 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
// 才能执行,让 select 方法无谓阻塞
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
// 有任务 或者 正在等待io事件但io事件还没来就被唤醒了 或者 io事件来了
cancelledKeys = 0;
needsToSelectAgain = false;
// (如果eventLoop在执行非io任务的事件过长,势必会影响到io事件的处理)
// ioRatio 默认是 50
final int ioRatio = this.ioRatio;
// 如果ioRatio设置为 100,那么会让普通任务都运行完。
// 如果ioRatio不设置为100,那么会根据io事件处理的运行时间,算出普通任务可以运行的时间,
// 算出的这个时间仅仅是用来判断要不要继续运行下1个普通任务,
// 因此,如果1个普通任务本身耗时就特别长,
// 这里是没有中断这个任务的说法的,而且还得任务响应中断才行。
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// ioRatio 为 100 时,总是运行完所有非 IO 任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 记录 io 事件处理耗时
final long ioTime = System.nanoTime() - ioStartTime;
// 运行非 IO 任务,一旦超时会退出 runAllTasks
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
NioEventLoop#select
io.netty.channel.nio.NioEventLoop#select
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 计算等待时间
// * 没有 scheduledTask定时任务,超时时间为 1s
// * 有 scheduledTas定时任务k,超时时间为 `下一个定时任务执行时间 - 当前时间`
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos
- currentTimeNanos
+ 500000L) / 1000000L;
// 如果超时,退出循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果期间又有task退出循环,如果没这个判断,那么任务就会等到下次select超时时才能被执行
// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// select 有限时阻塞
// 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,
// 导致不断空轮询,cpu 占用 100%
// (所以就用了 selectCnt ++ 来统计次数,因为如果bug发生的话,循环会很快,
// 这样selectCnt就会猛增,就检测到了)
int selectedKeys = selector.select(timeoutMillis);
// 计数加 1
selectCnt ++;
// 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
if (selectedKeys != 0
|| oldWakenUp
|| wakenUp.get()
|| hasTasks()
|| hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
// 线程被打断,退出循环
// 记录日志
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time-TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 如果超时,计数重置为 1,下次循环就会 break
selectCnt = 1;
}
// 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
// 这是为了解决 nio 空轮询 bug
// (重新创建1个selector,来替换原来的selector,来解决nio空轮询bug)
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0
&& selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 重建 selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
// 记录日志
}
} catch (CancelledKeyException e) {
// 记录日志
}
}
NioEventLoop#processSelectedKeys
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
// SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
// 遍历所有的selectionKey
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 获取完就置为null
selectedKeys.keys[i] = null;
// 附件就是 NioServerSocketChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理selectionKey
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
- NioEventLoop#processSelectedKey
io.netty.channel.nio.NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 当 key 取消或关闭时会导致这个 key 无效
if (!k.isValid()) {
// 无效时处理...
return;
}
try {
int readyOps = k.readyOps();
// 连接事件
// (这个是客户端需要监听处理的事件,服务端就不用管)
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// OP_ACCEPT 和 OP_CONNECT的理解:
// ServerBootstrap用来处理服务器端,而Bootstrap用于客户端。当服务器绑定端口后,会注册OP_ACCEPT事件,等待客户端连接。一旦有连接进来,就会触发这个事件,然后创建子Channel来处理通信。而客户端在连接服务器时,会发起非阻塞的连接操作,这时候会注册OP_CONNECT,当连接建立完成后,触发该事件,之后就可以进行读写操作了
// OP_CONNECT只是在连接过程中注册,一旦连接成功,就会触发,之后可能需要修改感兴趣的事件为OP_READ等。
// 需要注意,当连接失败时,OP_CONNECT也会触发,这时候需要处理异常情况。比如,在Netty中,连接失败会触发相应的异常处理机制,比如channel的exceptionCaught方法
// 总结来说,OP_ACCEPT是服务器端用于接收新连接,而OP_CONNECT是客户端用于处理连接建立完成的事件
// 可读:当客户端或服务端的缓冲区有接收到数据,这时候,就会通知程序有数据可以读了,然后这里就会触发read事件,然后handler就使用channel去读取数据,假设这里在handler里面只读了1半数据,然后就不读了,就是说还有数据没有读,但是这个时候,就去处理下1个selectionKey,那么当调用下1次selector.select方法时,仍然会由于有数据要读取,而被唤醒,仍然是对应该channel的selectionKey的可读事件。
// 可写:当客户端或服务端需要将数据发送出去,这时候,需要订阅可写事件,当发送缓冲区可写时,就会触发这个事件,然后使用channel将数据写出到缓冲区,假设1次没写完,那就需要继续订阅可写事件,直到全部数据写完,然后数据全部写完之后,取消订阅可写事件,然后又有数据需要发送,就再次订阅可写事件,写完之后,就取消订阅可写事件。
// 可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 可读 或 可接入事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 ||
readyOps == 0) {
// (这个方法同时处理 可接入 和 可读 事件,因为如果是NioServerSocketChannel它感兴趣的是OP_ACCEPT事件,而如果是NioSocketChannel它感兴趣的是OP_READ事件,对应的unsafe是不一样的。)
// 如果是可接入 AbstractNioMessageChannel.NioMessageUnsafe#read
// 如果是可读 AbstractNioByteChannel.NioByteUnsafe#read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
1.3 accept 剖析
其中,前面3步,在NioEventLoop#processSelectedKey中已经做过分析了。
nio 中如下代码,在 netty 中的流程
//1 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
//2 拿到一个事件
SelectionKey key = iter.next();
//3 如果是 accept 事件
if (key.isAcceptable()) {
//4 执行 accept
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
//5 关注 read 事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
}
入口代码
// 服务端
public class TestSourceServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
}
})
.bind(8888);
}
}
// 客户端
public class TestSourceClient {
public static void main(String[] args) {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
}
})
.connect(new InetSocketAddress("localhost", 8888));
}
}
AbstractNioMessageChannel.NioMessageUnsafe#read
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// doReadMessages中执行了accept并创建【NioSocketChannel】作为消息放入readBuf
// readBuf 是一个 ArrayList 用来缓存消息
// (看下面的doReadMessages方法)
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// localRead 为 1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t; // 忽略暂时的异常
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 read 事件,让NioServerSocketChannel的pipelin上的 handler 处理,
// 这时 肯定交给 ServerBootstrapAcceptor#channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 触发读取完毕事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
// 触发异常事件
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 获取到SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 创建NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
ServerBootstrapAcceptor#channelRead
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的 msg 是 NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel 添加 childHandler 即初始化器
// (这里添加的是初始化器)
child.pipeline().addLast(childHandler);
// 设置选项
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
AbstractChannel.AbstractUnsafe#register
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register
方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 一些检查,略...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的事实是 nio boss -> nio worker 线程的切换!!!
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 调用注册的方法
register0(promise);
}
});
} catch (Throwable t) {
// 日志记录...
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
*AbstractUnsafe#register0
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 这里面在 AbstractNioChannel 的 doRegister()方法中会将channel注册到selector上
doRegister();
neverRegistered = false;
registered = true;
//【关键代码,注意初始化器执行前后。这里将会为NiosocketChannel添加自定义的handler。】
// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> logging handler -> my handler -> tail
// 上面将客户端的channel已经配置好了,所以通知promise已经成功设置了
safeSetSuccess(promise);
// 触发handler的 channelRegistered事件
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 触发 pipeline 上 active 事件
// (这里就会在HeadContext#channelActive中让channel关注可读事件)
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
- HeadContext#channelActive
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
// (注册可读事件)
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
// 进入该调用,经过pipeline逐链调用,会来到HeadContext的read方法
channel.read();
}
}
@Override
public void read(ChannelHandlerContext ctx) {
// 接着这里调用到了AbstractNioChannel#doBeginRead
unsafe.beginRead();
}
–AbstractNioChannel#doBeginRead
io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 这时候 interestOps 是 0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注 read 事件!!!
selectionKey.interestOps(interestOps | readInterestOp);
}
}
1.4 read 剖析
接着NioEventLoop#processSelectedKey的那节,当对方发送消息来时。
AbstractNioByteChannel.NioByteUnsafe#read
再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// io.netty.allocator.type 决定 allocator 的实现
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
// 上面还是空的byteBuf
// 读取(这里就会将缓冲区中的数据读取到byteBuf中,
// 调用NioSocketChannel#doReadBytes)
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发read事件,让pipeline上的handler处理,这时是处理 NioSocketChannel上的 handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发 read complete 事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
NioSocketChannel#doReadBytes
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
// 读取数据
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
MaxMessageHandle#continueReading
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为 true
config.isAutoRead() &&
// respectMaybeMoreData 默认为 true
// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead 默认 16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}
1.5 write剖析
public class TestSourceServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
}
})
.bind(8888);
}
}
public class TestSourceClient {
public static void main(String[] args) {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().write("halo");
}
});
}
})
.connect(new InetSocketAddress("localhost", 8888));
}
}
HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
write:写队列
我们来看看channel中unsafe的write方法,先来看看其中的一个属性
AbstractUnsafe
protected abstract class AbstractUnsafe implements Unsafe {
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
我们来看看 ChannelOutboundBuffer 这个类
public final class ChannelOutboundBuffer {
private final Channel channel;
private ChannelOutboundBuffer.Entry flushedEntry;
private ChannelOutboundBuffer.Entry unflushedEntry;
private ChannelOutboundBuffer.Entry tailEntry;
ChannelOutboundBuffer内部维护了一个Entry链表,并使用Entry封装msg。其中的属性我们下面会详细讲
我们回到正题,接着看 unsafe.write(msg, promise);
AbstractUnsafe
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
1.调用 filterOutboundMessage() 方法,将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
2.接下来,估算出需要写入的ByteBuf的size
3.最后,调用 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情
ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 创建一个待写出的消息节点
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(size, false);
}
想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图
hannelOutboundBuffer 里面的数据结构是一个单链表结构,每个节点是一个 Entry,Entry 里面包含了待写出ByteBuf 以及消息回调 promise,下面分别是三个指针的作用
1.flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点
2.unFlushedEntry 指针表示第一个未被写入到操作系统Socket缓冲区中的节点
3.tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点
初次调用 addMessage 之后,各个指针的情况为
fushedEntry指向空,unFushedEntry和 tailEntry 都指向新加入的节点
第二次调用 addMessage之后,各个指针的情况为
第n次调用 addMessage之后,各个指针的情况为
可以看到,调用n次addMessage,flushedEntry指针一直指向NULL,表示现在还未有节点需要写出到Socket缓冲区,而unFushedEntry之后有n个节点,表示当前还有n个节点尚未写出到Socket缓冲区中去
flush:刷新写队列
不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点
HeadContext
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
之后进入到AbstractUnsafe
AbstractUnsafe
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
flush方法中,先调用 outboundBuffer.addFlush();
ChannelOutboundBuffer
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
可以结合前面的图来看,首先拿到 unflushedEntry 指针,然后将 flushedEntry 指向unflushedEntry所指向的节点,调用完毕之后,三个指针的情况如下所示
相当于所有的节点都即将开始推送出去
接下来,调用 flush0();
AbstractUnsafe
protected void flush0() {
doWrite(outboundBuffer);
}
发现这里的核心代码就一个 doWrite,继续跟
AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
// 拿到第一个需要flush的节点的数据
Object msg = in.current();
if (msg instanceof ByteBuf) {
// 强转为ByteBuf,若发现没有数据可读,直接删除该节点
ByteBuf buf = (ByteBuf) msg;
boolean done = false;
long flushedAmount = 0;
// 拿到自旋锁迭代次数
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
// 自旋,将当前节点写出
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
// 写完之后,将当前节点删除
if (done) {
in.remove();
} else {
break;
}
}
}
}
这里略微有点复杂,我们分析一下
1.第一步,调用current()先拿到第一个需要flush的节点的数据
ChannelOutBoundBuffer
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
2.第二步,拿到自旋锁的迭代次数
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
3.自旋的方式将ByteBuf写出到jdk nio的Channel
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
doWriteBytes 方法跟进去
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
我们发现,出现了 javaChannel(),表明已经进入到了jdk nio Channel的领域,我们来看看 buf.readBytes(javaChannel(), expectedWrittenBytes);
public int readBytes(GatheringByteChannel out, int length) throws IOException {
this.checkReadableBytes(length);
int readBytes = this.getBytes(this.readerIndex, out, length);
this.readerIndex += readBytes;
return readBytes;
}
我们来看关键代码 this.getBytes(this.readerIndex, out, length)
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
this.checkIndex(index, length);
if (length == 0) {
return 0;
} else {
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = this.internalNioBuffer();
} else {
tmpBuf = ((ByteBuffer)this.memory).duplicate();
}
index = this.idx(index);
tmpBuf.clear().position(index).limit(index + length);
//将tmpBuf中的数据写到out中
return out.write(tmpBuf);
}
}
我们来看看out.write(tmpBuf)
public int write(ByteBuffer src) throws IOException {
ensureOpen();
if (!writable)
throw new NonWritableChannelException();
synchronized (positionLock) {
int n = 0;
int ti = -1;
try {
begin();
ti = threads.add();
if (!isOpen())
return 0;
do {
n = IOUtil.write(fd, src, -1, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end(n > 0);
assert IOStatus.check(n);
}
}
}
和read实现一样,SocketChannelImpl的write方法通过IOUtil的write实现:关键代码 n = IOUtil.write(fd, src, -1, nd);
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
//如果是DirectBuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
if (var1 instanceof DirectBuffer) {
return writeFromNativeBuffer(var0, var1, var2, var4);
} else {
//非DirectBuffer
//获取已经读取到的位置
int var5 = var1.position();
//获取可以读到的位置
int var6 = var1.limit();
assert var5 <= var6;
//申请一个原buffer可读大小的DirectByteBuffer
int var7 = var5 <= var6 ? var6 - var5 : 0;
ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
int var10;
try {
var8.put(var1);
var8.flip();
var1.position(var5);
//通过DirectBuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
if (var9 > 0) {
var1.position(var5 + var9);
}
var10 = var9;
} finally {
//回收分配的DirectByteBuffer
Util.offerFirstTemporaryDirectBuffer(var8);
}
return var10;
}
}
代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filterOutboundMessage(),将待写入的对象过滤,把非ByteBuf对象和FileRegion过滤,把所有的非直接内存转换成直接内存DirectBuffer
说明到了这一步所有的 var1 意境是直接内存DirectBuffer,就不需要走到else,就不需要write两次了
4.删除该节点
节点的数据已经写入完毕,接下来就需要删除该节点
ChannelOutBoundBuffer
public boolean remove() {
Entry e = flushedEntry;
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
}
// recycle the entry
e.recycle();
return true;
}
首先拿到当前被flush掉的节点(flushedEntry所指),然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()方法移除该节点
private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
这里的remove是逻辑移除,只是将flushedEntry指针移到下个节点,调用完毕之后,节点图示如下
writeAndFlush: 写队列并刷新
理解了write和flush这两个过程,writeAndFlush 也就不难了
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
}
}
可以看到,最终,通过一个boolean变量,表示是调用 invokeWriteAndFlush,还是 invokeWrite,invokeWrite便是我们上文中的write过程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
invokeFlush0();
}
可以看到,最终调用的底层方法和单独调用 write 和 flush 是一样的
private void invokeWrite(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
}
private void invokeFlush(Object msg, ChannelPromise promise) {
invokeFlush0(msg, promise);
}
由此看来,invokeWriteAndFlush基本等价于write方法之后再来一次flush