一、IO模型
同步IO:
- 阻塞IO
- 非阻塞IO
- 信号驱动IO
- IO多路复用
异步IO:异步IO都是非阻塞的
同步非阻塞和异步IO的区别:
- Non-Blocking IO在数据包准备时是非阻塞的,但是在将数据从内核空间拷贝到用户空间还是会阻塞(IO)。
- 异步IO是当进程发起IO操作之后,就直接返回再也不理睬,由内核完成读写,读写完成之后Kernel发送一个信号,告诉线程IO完成。在整个过程中,发起IO的线程没有任何阻塞。
二、Netty组件
2.1 EventLoop
负责处理事件,有Boss和Worker两种,一个EventLoop中只有一个负责IO的线程(Boss/Worker)。一个Channel只能绑定一个EventLoop,一个EventLoop可以管理多个Channel。
EventLoopGroup:一组EventLoop,channel会调用EventLoopGroup中的注册方法绑定其中的一个EventLoop
作用:提交任务、执行定时任务、处理IO事件
2.2 Channel
首先,服务端在启动监听端口时,会建立连接端口的channel,并将该channel交给selector管理,这个channel是全局唯一的(也就是多个客户端发起连接请求都是使用同一个channel)。当客户端使用该channel完成连接时,会为连接建立专属channel(不同的连接使用不同的channel)
注意:这里的读指的是服务端读入,写指的是服务端写出。(读入缓冲区,从缓冲区写出)bytebuffer
由于初始时缓冲区的内容是空的,所以连接建立时,如果没有读,则写操作是没有意义的,所以默认将channel设置为读使用,但是如果是服务端自己造数据直接写入的话,还是可以写入的
客户端在断开连接后会出发一个读事件
最好为channel在交给selector时绑定一个附件(attachment)通常是缓冲区
一个EventLoop相对于一个流水线厂,channel相当于产品,需要在流水线中的各个流程中被处理,而每一个处理流程就是一个Handler,一个Channel只能和一个EventLoop关联,一个EventLoop可以处理很多channel
2.3 Future & Promise
jdk future (父类) —> netty future —> netty promise
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
promise.setSuccess(Integer); // 向promise容器中存入值
promise.get(Integer); //从promise容器中获取值,如果容器中没有值,线程会被阻塞
promise容器中只能存一个值
2.4 handler & pipline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
分类:入站和出站。
- 当读取数据时,就是入站
- 写出数据时就是出站
- 入站按照加入pipline的handler的顺序执行,出站相反
ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
- 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 1 处代码,则仅会打印 1
- 如果注释掉 2 处代码,则仅会打印 1 2
- 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 3 处代码,则仅会打印 1 2 3
- 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 6 处代码,则仅会打印 1 2 3 6
- ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己
图1 - 服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
Head -> In_1 -> In_2 ->…-> tail -> Out_1(倒叙,先加入Pipline后执行) -> Out_2 -> …-> Head
2.5 ByteBuf
自动扩容
heap buffer和direct buffer
池化/非池化
三、零拷贝
Java零拷贝、网络传输零拷贝、Netty零拷贝的区别:
- java零拷贝:
- 网络传输零拷贝
- Netty的零拷贝
ByteBuf buf = ByteBuf.alloc(10);
ByteBuf f1 = buf.slice(0,5);
ByteBuf f1 = buf.slice(5,5);
四、Reactor模型
Netty其实就是Reactor模型的一个实例
4.1 Reactor三种线程模型
Reactor线程:负责处理IO事件(建立连接、读、写)。
- 单Reactor单线程模型:监听连接和读写IO使用同一个Reactor线程,且后续处理共用一个线程,在Netty中就是EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup。
- 单Reactor多线程模型:监听连接和读写IO使用的还是同一个Reactor线程,但是后续处理非IO请求(业务处理)使用的是多线程处理。例如:将数据IO读取到之后,进行数据计算的处理(对应netty中的handler)
- Reactor主从线程模型:监听端口的IO线程一个,处理读写的IO线程有多个。EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。
多线程模型和Reactor主从的区别:
主从Reactor中,主Reactor只监听端口和建立连接(只有一个线程),然后将连接交给子Reactor管理,每个子Reactor中包含一个Reactor线程,负责监听并处理主Reactor发放过来的连接的IO请求。而多线程模型中,只有一个IO线程。
模型 | I/O线程数量 | 业务处理线程数量 |
---|---|---|
单Reactor单线程 | 1个线程处理所有I/O | 1个线程(和IO线程是同一个) |
单Reactor多线程 | 1个线程处理I/O监听 | 多线程 |
主从Reactor | 1个主负责监听建立连接,多子Reactor线程并行处理I/O读写 | 多线程 |
所以这里的Reactor是指处理IO事件的线程,单/多线程是Handler的线程
Netty模型 : Reactor主从模型
Netty采用的就是Reactor主从模型,其中Channel可以理解为是一个建立好的连接,
当监听端口线程(主Reactor中的Reactor线程)建立连接(.accept())后,为连接创建一个独有的Channel,并分配给一个EventLoop(EventLoop是一个线程,负责处理IO请求,也就是子Reactor中的Reactor线程),然后进行业务处理(netty中对应的是ChannelPipline的Handler链)。
EventLoop VS ChannelPipline
ChannelPipline是流水线,处理数据,Handler是每一个处理点
EventLoop可以理解为是IO请求处理线程(读写IO)(包含线程,还有其他的扩展熟悉),将读取到的数据放到ChannelPipline进行处理,或者将ChannelPipline处理的数据进行IO发送。
在Netty中,Selector的本质就是Reactor线程的I/O事件轮询器。
4.2 selector空轮询以及Netty的解决方案
空轮询:
是操作系统的bug,本来预期是没有IO请求的时候,reactor线程应该阻塞,但是os会返回0,使得线程没有阻塞,返回0之后继续监听返回0,一直占用CPU。
解决方案
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug,则计数器+1,如果计数器超过阙值,则重建Selector并迁移所有其管理的channel。
4.3 粘包和拆包
Nagle算法
将发送的数据先存到发送缓冲区中,当积累达到一定大小时才发送。Linux默认开启Nagle,Netty默认关闭nagle
Netty的帧解码器
粘包拆包处理:
- 固定长度数据包
- 特定字符分割
- 设置 头部数据,指定数据长度
Netty中的解码器:
- 固定长度解码器
- 固定字符分割解码器
- 长度域解码器 LengthFieldBasedFrameDecoder (帧解码器)
// 长度字段的偏移量,也就是存放长度数据的起始位置
private final int lengthFieldOffset;
// 长度字段所占用的字节数
private final int lengthFieldLength;
/*
* 消息长度的修正值
*
* 在很多较为复杂一些的协议设计中,长度域不仅仅包含消息的长度,而且包含其他的数据,如版本号、数据类型、数据状态等,那么这时候我们需要使用 lengthAdjustment 进行修正
*
* lengthAdjustment = 包体的长度值 - 长度域的值
*
*/
private final int lengthAdjustment;
// 解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
private final int initialBytesToStrip;
// 长度字段结束的偏移量,lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
private final int lengthFieldEndOffset;
- lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。
- lengthFieldLength = 2,协议设计的固定长度。
- lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。
- initialBytesToStrip = 0,解码后内容依然是 Length + Content,不需要跳过任何初始字节。
- lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。
- lengthFieldLength = 2,协议设计的固定长度。
- lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。
- initialBytesToStrip = 2,跳过 Length 字段的字节长度,解码后 ByteBuf 中只包含 Content字段。
4.4 WriteAndFlush
分为 write和flush,write是将数据写入ChannelOutboundBuffer中,flush是写入
ChannelOutBuffer的结构:链表结构,三个指针 (第一个呗写道缓冲区的节点,第一个未被写到缓冲区的节点和最后一个节点)
write
每一个msg都是一个entry
写入的时候会判断当前线程是否是负责IO的Reactor线程,如果不是就见写任务存入队列中稍后由IO线程执行(这样就可以保证所有写IO操作都由一个线程执行,线程安全)
flush
flushedEntry 指针变更为 unflushedEntry 指针所指向的数据,然后 unflushedEntry 指针指向 NULL,flushedEntry 指针指向的数据才会被真正发送到 Socket 缓冲区。
ChannelOutboundBuffer中维护了一个值,记录存在待发送至socket发送缓冲区的字节数,如果超过阙值,则Channel变为不可写状态,如果低于则可写。writer会增加这个值,flush会减少这个值
flush0():真正将数据写入Socket发送缓冲区的方法
// AbstractNioUnsafe # flush0
@Override
protected final void flush0() {
if (!isFlushPending()) {
super.flush0();
}
}
// AbstractNioByteChannel # doWrite
@Override
// flush0的核心逻辑
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// 获取自选锁的最大轮询次数,数据不能一次就写入SOcket发送缓冲区,所以每次写都要获取锁来向socket发送缓冲区写入,在自选过程中,eventloop无法去处理其他IO事件,所以不能自选过多次,超过一定时间就暂时中断写入。
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current();
if (msg == null) {
clearOpWrite();
return;
}
//每次尝试写入socket,自旋锁次数减一(doWriteInternal,JDK NIO 底层)
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
- writeAndFlush 属于出站操作,它是从 Pipeline 的 Tail 节点开始进行事件传播,一直向前传播到 Head 节点。不管在 write 还是 flush 过程,Head 节点都中扮演着重要的角色。
- write 方法并没有将数据写入 Socket 缓冲区,只是将数据写入到 ChannelOutboundBuffer 缓存中,ChannelOutboundBuffer 缓存内部是由单向链表实现的。
- flush 方法才最终将数据写入到 Socket 缓冲区。等待操作系统发送
4.5 分配堆外内存
ByteBuf .allocateDirect(size)
在堆内存放的 DirectByteBuffer 对象并不大,仅仅包含堆外内存的地址、大小等属性,同时还会创建对应的 Cleaner 对象,通过 ByteBuffer 分配的堆外内存不需要手动回收,它可以被 JVM 自动回收。当堆内的 DirectByteBuffer 对象被 GC 回收时,Cleaner 就会用于回收对应的堆外内存
4.6 Netty的对象池(Recycler)
目的:实现对象的复用,减少对象的频繁创建和GC。
注意事项:
- 线程池管理的是同一类对象,创建线程池时候需要指定泛型Recycler<Student>
- 这里线程A和线程B的线程池中的对象是同一类对象
- 每个线程都会持有各自的对象池,内部通过 FastThreadLocal 来实现每个线程的私有化。
使用流程:
A创建的对象放在线程池中,B获取A的对象池中的对象,使用之后。放入WeakOrderQueue,等待A回收。线程获取本对象池的对象,从本地的栈中获取(对象池的本质是一个栈)。如果本地线程池中没有对象,则从WeakOrderQueue中获取。如果还是没有则创建对象。
组件说明:
- Handle:每一个对象的适配器,负责获取、回收对象。
- WeakOrderQueue:可以理解为线程之间的通信方式(负责对象回收间的通讯),每个线程存有对应其他线程的WeakOrderQueue,当本线程使用对象(例如A使用了B的对象)结束后,就存入对应线程的WeakOrderQueue中(A将对象存入A中对应B的WeakOrderQueue),等待线程回收。
线程如何获取其他线程的对象(可以通过对象传递),如:
// 伪代码示例
channel.read().addListener(future -> {
ByteBuf buf = (ByteBuf) future.get(); // 线程A创建的对象
executorService.execute(() -> { // 提交到线程B执行
process(buf); // 线程B使用线程A的对象
buf.release(); // 线程B尝试回收
});
});