Netty入门学习

发布于:2025-07-21 ⋅ 阅读:(13) ⋅ 点赞:(0)

一、IO模型

同步IO:

  • 阻塞IO
  • 非阻塞IO
  • 信号驱动IO
  • IO多路复用

异步IO:异步IO都是非阻塞的

同步非阻塞和异步IO的区别:

  • Non-Blocking IO在数据包准备时是非阻塞的,但是在将数据从内核空间拷贝到用户空间还是会阻塞(IO)。
  • 异步IO是当进程发起IO操作之后,就直接返回再也不理睬,由内核完成读写,读写完成之后Kernel发送一个信号,告诉线程IO完成。在整个过程中,发起IO的线程没有任何阻塞。

二、Netty组件

2.1 EventLoop

负责处理事件,有Boss和Worker两种,一个EventLoop中只有一个负责IO的线程(Boss/Worker)。一个Channel只能绑定一个EventLoop,一个EventLoop可以管理多个Channel

EventLoopGroup:一组EventLoop,channel会调用EventLoopGroup中的注册方法绑定其中的一个EventLoop

作用:提交任务、执行定时任务、处理IO事件

2.2 Channel

首先,服务端在启动监听端口时,会建立连接端口的channel,并将该channel交给selector管理,这个channel是全局唯一的(也就是多个客户端发起连接请求都是使用同一个channel)。当客户端使用该channel完成连接时,会为连接建立专属channel(不同的连接使用不同的channel)

注意:这里的读指的是服务端读入,写指的是服务端写出。(读入缓冲区,从缓冲区写出)bytebuffer

由于初始时缓冲区的内容是空的,所以连接建立时,如果没有读,则写操作是没有意义的,所以默认将channel设置为读使用但是如果是服务端自己造数据直接写入的话,还是可以写入的

客户端在断开连接后会出发一个读事件

最好为channel在交给selector时绑定一个附件(attachment)通常是缓冲区

一个EventLoop相对于一个流水线厂,channel相当于产品,需要在流水线中的各个流程中被处理,而每一个处理流程就是一个Handler,一个Channel只能和一个EventLoop关联,一个EventLoop可以处理很多channel

2.3 Future & Promise

jdk future (父类) —> netty future —> netty promise

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
promise.setSuccess(Integer);  // 向promise容器中存入值
promise.get(Integer);     //从promise容器中获取值,如果容器中没有值,线程会被阻塞 

promise容器中只能存一个值

2.4 handler & pipline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

分类:入站和出站。

  1. 当读取数据时,就是入站
  2. 写出数据时就是出站
  3. 入站按照加入pipline的handler的顺序执行,出站相反

ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
****

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)
    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己

图1 - 服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
在这里插入图片描述
Head -> In_1 -> In_2 ->…-> tail -> Out_1(倒叙,先加入Pipline后执行) -> Out_2 -> …-> Head

2.5 ByteBuf

自动扩容

heap buffer和direct buffer

池化/非池化
在这里插入图片描述

三、零拷贝

Java零拷贝、网络传输零拷贝、Netty零拷贝的区别

  • java零拷贝:
    在这里插入图片描述
  • 网络传输零拷贝
    在这里插入图片描述
  • Netty的零拷贝
    在这里插入图片描述
ByteBuf buf = ByteBuf.alloc(10);
ByteBuf f1 = buf.slice(0,5);
ByteBuf f1 = buf.slice(5,5);

四、Reactor模型

Netty其实就是Reactor模型的一个实例

4.1 Reactor三种线程模型

Reactor线程:负责处理IO事件(建立连接、读、写)。

  1. 单Reactor单线程模型:监听连接和读写IO使用同一个Reactor线程,且后续处理共用一个线程,在Netty中就是EventLoopGroup 只包含一个 EventLoop,Boss 和 Worker 使用同一个EventLoopGroup。
  2. 单Reactor多线程模型:监听连接和读写IO使用的还是同一个Reactor线程,但是后续处理非IO请求(业务处理)使用的是多线程处理。例如:将数据IO读取到之后,进行数据计算的处理(对应netty中的handler
  3. Reactor主从线程模型:监听端口的IO线程一个,处理读写的IO线程有多个。EventLoopGroup 包含多个 EventLoop,Boss 是主 Reactor,Worker 是从 Reactor,它们分别使用不同的 EventLoopGroup,主 Reactor 负责新的网络连接 Channel 创建,然后把 Channel 注册到从 Reactor。

多线程模型和Reactor主从的区别:

主从Reactor中,主Reactor只监听端口和建立连接(只有一个线程),然后将连接交给子Reactor管理,每个子Reactor中包含一个Reactor线程,负责监听并处理主Reactor发放过来的连接的IO请求。而多线程模型中,只有一个IO线程。

模型 I/O线程数量 业务处理线程数量
单Reactor单线程 1个线程处理所有I/O 1个线程(和IO线程是同一个)
单Reactor多线程 1个线程处理I/O监听 多线程
主从Reactor 1个主负责监听建立连接,多子Reactor线程并行处理I/O读写 多线程

所以这里的Reactor是指处理IO事件的线程,单/多线程是Handler的线程

Netty模型 : Reactor主从模型

Netty采用的就是Reactor主从模型,其中Channel可以理解为是一个建立好的连接,

当监听端口线程(主Reactor中的Reactor线程)建立连接(.accept())后,为连接创建一个独有的Channel,并分配给一个EventLoop(EventLoop是一个线程,负责处理IO请求,也就是子Reactor中的Reactor线程),然后进行业务处理(netty中对应的是ChannelPipline的Handler链)。

EventLoop VS ChannelPipline

ChannelPipline是流水线,处理数据,Handler是每一个处理点

EventLoop可以理解为是IO请求处理线程(读写IO)(包含线程,还有其他的扩展熟悉),将读取到的数据放到ChannelPipline进行处理,或者将ChannelPipline处理的数据进行IO发送。
在这里插入图片描述
在Netty中,Selector的本质就是Reactor线程的I/O事件轮询器

4.2 selector空轮询以及Netty的解决方案

空轮询:

是操作系统的bug,本来预期是没有IO请求的时候,reactor线程应该阻塞,但是os会返回0,使得线程没有阻塞,返回0之后继续监听返回0,一直占用CPU。

解决方案

long time = System.nanoTime();

if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
    selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
    selector = selectRebuildSelector(selectCnt);
    selectCnt = 1;
    break;
}

如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug,则计数器+1,如果计数器超过阙值,则重建Selector并迁移所有其管理的channel。

4.3 粘包和拆包

Nagle算法

将发送的数据先存到发送缓冲区中,当积累达到一定大小时才发送。Linux默认开启Nagle,Netty默认关闭nagle

Netty的帧解码器

粘包拆包处理:

  1. 固定长度数据包
  2. 特定字符分割
  3. 设置 头部数据,指定数据长度

Netty中的解码器:

  • 固定长度解码器
  • 固定字符分割解码器
  • 长度域解码器 LengthFieldBasedFrameDecoder (帧解码器)
// 长度字段的偏移量,也就是存放长度数据的起始位置
private final int lengthFieldOffset; 
// 长度字段所占用的字节数
private final int lengthFieldLength; 
/*
 * 消息长度的修正值
 *
 * 在很多较为复杂一些的协议设计中,长度域不仅仅包含消息的长度,而且包含其他的数据,如版本号、数据类型、数据状态等,那么这时候我们需要使用 lengthAdjustment 进行修正
 * 
 * lengthAdjustment = 包体的长度值 - 长度域的值
 *
 */
private final int lengthAdjustment; 
// 解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
private final int initialBytesToStrip;
// 长度字段结束的偏移量,lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
private final int lengthFieldEndOffset;

在这里插入图片描述

  • lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。
  • lengthFieldLength = 2,协议设计的固定长度。
  • lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。
  • initialBytesToStrip = 0,解码后内容依然是 Length + Content,不需要跳过任何初始字节。

在这里插入图片描述

  • lengthFieldOffset = 0,因为 Length 字段就在报文的开始位置。
  • lengthFieldLength = 2,协议设计的固定长度。
  • lengthAdjustment = 0,Length 字段只包含消息长度,不需要做任何修正。
  • initialBytesToStrip = 2,跳过 Length 字段的字节长度,解码后 ByteBuf 中只包含 Content字段。

4.4 WriteAndFlush

分为 write和flush,write是将数据写入ChannelOutboundBuffer中,flush是写入

ChannelOutBuffer的结构:链表结构,三个指针 (第一个呗写道缓冲区的节点,第一个未被写到缓冲区的节点和最后一个节点)

write

在这里插入图片描述

每一个msg都是一个entry

写入的时候会判断当前线程是否是负责IO的Reactor线程,如果不是就见写任务存入队列中稍后由IO线程执行(这样就可以保证所有写IO操作都由一个线程执行,线程安全)

flush

在这里插入图片描述
flushedEntry 指针变更为 unflushedEntry 指针所指向的数据,然后 unflushedEntry 指针指向 NULL,flushedEntry 指针指向的数据才会被真正发送到 Socket 缓冲区。

ChannelOutboundBuffer中维护了一个值,记录存在待发送至socket发送缓冲区的字节数,如果超过阙值,则Channel变为不可写状态,如果低于则可写。writer会增加这个值,flush会减少这个值

flush0():真正将数据写入Socket发送缓冲区的方法

// AbstractNioUnsafe # flush0

@Override

protected final void flush0() {
    if (!isFlushPending()) {
        super.flush0();
    }
}

// AbstractNioByteChannel # doWrite

@Override
// flush0的核心逻辑
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    // 获取自选锁的最大轮询次数,数据不能一次就写入SOcket发送缓冲区,所以每次写都要获取锁来向socket发送缓冲区写入,在自选过程中,eventloop无法去处理其他IO事件,所以不能自选过多次,超过一定时间就暂时中断写入。
    int writeSpinCount = config().getWriteSpinCount();
    do {
        Object msg = in.current();
        if (msg == null) {
            clearOpWrite();
            return;
        }
        //每次尝试写入socket,自旋锁次数减一(doWriteInternal,JDK NIO 底层)
        writeSpinCount -= doWriteInternal(in, msg);
    } while (writeSpinCount > 0);
    incompleteWrite(writeSpinCount < 0);
}
  • writeAndFlush 属于出站操作,它是从 Pipeline 的 Tail 节点开始进行事件传播,一直向前传播到 Head 节点。不管在 write 还是 flush 过程,Head 节点都中扮演着重要的角色。
  • write 方法并没有将数据写入 Socket 缓冲区,只是将数据写入到 ChannelOutboundBuffer 缓存中,ChannelOutboundBuffer 缓存内部是由单向链表实现的。
  • flush 方法才最终将数据写入到 Socket 缓冲区。等待操作系统发送

4.5 分配堆外内存

ByteBuf .allocateDirect(size)

在堆内存放的 DirectByteBuffer 对象并不大,仅仅包含堆外内存的地址、大小等属性,同时还会创建对应的 Cleaner 对象,通过 ByteBuffer 分配的堆外内存不需要手动回收,它可以被 JVM 自动回收。当堆内的 DirectByteBuffer 对象被 GC 回收时,Cleaner 就会用于回收对应的堆外内存

4.6 Netty的对象池(Recycler)

在这里插入图片描述
目的:实现对象的复用,减少对象的频繁创建和GC。
注意事项

  1. 线程池管理的是同一类对象,创建线程池时候需要指定泛型Recycler<Student>
  2. 这里线程A和线程B的线程池中的对象是同一类对象
  3. 每个线程都会持有各自的对象池,内部通过 FastThreadLocal 来实现每个线程的私有化。

使用流程:

A创建的对象放在线程池中,B获取A的对象池中的对象,使用之后。放入WeakOrderQueue,等待A回收。线程获取本对象池的对象,从本地的栈中获取(对象池的本质是一个栈)。如果本地线程池中没有对象,则从WeakOrderQueue中获取。如果还是没有则创建对象。

组件说明

  • Handle:每一个对象的适配器,负责获取、回收对象。
  • WeakOrderQueue:可以理解为线程之间的通信方式(负责对象回收间的通讯),每个线程存有对应其他线程的WeakOrderQueue,当本线程使用对象(例如A使用了B的对象)结束后,就存入对应线程的WeakOrderQueue中(A将对象存入A中对应B的WeakOrderQueue),等待线程回收。

线程如何获取其他线程的对象(可以通过对象传递),如:

// 伪代码示例
channel.read().addListener(future -> {
    ByteBuf buf = (ByteBuf) future.get(); // 线程A创建的对象
    executorService.execute(() -> {       // 提交到线程B执行
        process(buf);                    // 线程B使用线程A的对象
        buf.release();                   // 线程B尝试回收
    });
});

网站公告

今日签到

点亮在社区的每一天
去签到