文章目录
目录
二、NIO(Non-blocking I/O):同步非阻塞模型
前言
BIO->NIO->AIO的转变历程
传统BIO的问题在于高并发时的线程开销,而NIO通过多路复用减少了线程数量,提高了并发能力。但NIO的编程模型复杂,需要处理事件循环、缓冲区管理等。AIO则进一步简化,利用操作系统的异步支持,实现真正的非阻塞,但可能在某些系统上支持不够好,比如Linux的AIO实现不如Windows成熟,所以Netty还是基于NIO。
一、BIO(Blocking I/O):同步阻塞模型
核心机制:
每个连接对应一个线程,线程在读写操作时会被阻塞,直到数据就绪。
1.示例
直接看一个代码示例 socket建立连接发送数据(对于每个连接创建一个线程,实现异步通信)
首先需要了解BIO的阻塞点1.连接2.读写
// 服务端代码
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8080);
System.out.println("BIO 服务端启动,监听端口 8080...");
while (true) {
// 阻塞点1:等待客户端连接(没有连接时线程挂起)
Socket client = server.accept();
System.out.println("客户端连接:" + client.getRemoteSocketAddress());
// 为每个连接创建新线程处理读写
new Thread(() -> {
try {
// 阻塞点2:读取客户端数据(无数据时线程挂起)
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) { // 阻塞直到有数据
System.out.println("收到客户端消息: " + line);
// 阻塞点3:向客户端写回数据(输出流满时阻塞)
OutputStream out = client.getOutputStream();
out.write(("服务端响应: " + line + "\n").getBytes());
out.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
这里使用多线程的方式虽然解决了单线程阻塞的情况,但是对每个连都创建一个线程进行读写,如果这时候连接多了,线程太大,占用的资源可想而知
二、NIO(Non-blocking I/O):同步非阻塞模型
为什么会出现NIO呢,其实就是为了解决BIO单线程阻塞问题(读写阻塞,连接阻塞),实现单线程解决并发问题(多路复用)
核心机制:
多路复用器(Selector)轮询事件,单线程处理多个连接,核心是非阻塞和事件驱动。
关键组件:
Channel(通道):双向通信,替代BIO的流,支持非阻塞模式。
Buffer(缓冲区):数据读写的中转区,需配合
flip()
、clear()
操作。Selector(选择器):监控多个通道的事件(如连接、读、写)。
1.Buffer
底层结构
Buffer的底层实现是一个数组,具体类型取决于Buffer的子类,例如
- ByteBuffer底层是byte[]数组;
- IntBuffer底层是int[]数组;
- 其他类型如CharBuffer、DoubleBuffer等同理。
核心属性
java nio包下看到buffer类里面有这4个核心属性
capacity:数组的固定长度,创建后不可变。例如,ByteBuffer.allocate(10)会分配一个长度为10的byte[]数组。
position:指向下一个待读写的位置,初始为0,每次读写后自动递增。
limit:读写操作的上限。写模式下,limit等于capacity;读模式下,limit等于有效数据的末尾位置(由flip()方法设置)。
mark:通过mark()方法保存position的某个状态,后续可通过reset()恢复到此位置。
总结
Buffer本质上是封装了数组的内存块,通过capacity(容量)、position(读写位置)、limit(操作上限)、mark(标记位)四个属性管理数据流。例如,ByteBuffer底层是byte[]数组,其他类型如IntBuffer同理
核心功能
缓冲作用:在I/O操作中,Channel(通道)与Buffer配合,实现数据的批量读写。例如,写数据时先填充Buffer再批量写入Channel,减少直接操作磁盘或网络的次数,提升效率。
状态切换:通过flip()切换读写模式(写模式下limit=capacity,读模式下limit=position),以及clear()/compact()清空或压缩数据。
channal:
定位:替代传统 BIO 的 Stream,提供双向数据传输能力。
底层接口
主要实现类
类名 | 用途 |
---|---|
FileChannel |
文件 I/O(不支持非阻塞模式) |
SocketChannel |
TCP 网络套接字(客户端/服务端) |
ServerSocketChannel |
TCP 服务端监听套接字 |
DatagramChannel |
UDP 数据报通道 |
Pipe.SinkChannel /SourceChannel |
线程间通信通道 |
Selector(选择器):多路复用控制器
单线程管理多个通道的核心组件,实现 "1个线程处理1000连接"。
是NIO的核心,通过监控多个Channal 实现单个线程并发处理多个请求
selector也位于channals包下:
总结
NIO模型中,每个连接(无论是客户端连接还是服务端监听)都对应一个Channel,将这些Channel注册到同一个Selector上,由Selector统一监控这些Channel的I/O事件(如读、写、连接等)。Selector的核心作用就是用一个线程管理多个Channel,实现高并发。
代码示例:
服务器端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class nioTest {
private static final int PORT = 8080;
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) throws IOException {
// 1. 创建Selector(多路复用器)
Selector selector = Selector.open();
// 2. 创建ServerSocketChannel并配置非阻塞
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false); // 关键:设置为非阻塞模式
serverChannel.bind(new InetSocketAddress(PORT));
// 3. 注册ACCEPT事件到Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO 服务端启动,监听端口: " + PORT);
// 4. 事件循环(单线程处理所有连接)
while (true) {
// 阻塞等待就绪的通道(可设置超时)
int readyChannels = selector.select(1000); // 等待1秒
if (readyChannels == 0) {
System.out.println("等待事件中...");
continue;
}
// 5. 获取就绪的SelectionKey集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 6. 处理连接事件
if (key.isAcceptable()) {
handleAccept(key, selector);
}
// 7. 处理读事件
if (key.isReadable()) {
handleRead(key);
}
// 8. 处理写事件(通常只在需要时注册)
if (key.isWritable()) {
handleWrite(key);
}
// 9. 移除已处理的Key(必须!)
keyIterator.remove();
}
}
}
// 处理新连接
private static void handleAccept(SelectionKey key, Selector selector)
throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 10. 接受连接(非阻塞,立即返回)
SocketChannel clientChannel = serverChannel.accept();
if (clientChannel == null) return;
System.out.println("客户端连接: " + clientChannel.getRemoteAddress());
// 11. 配置客户端通道为非阻塞
clientChannel.configureBlocking(false);
// 12. 注册读事件,并附加Buffer
clientChannel.register(
selector,
SelectionKey.OP_READ,
ByteBuffer.allocate(BUFFER_SIZE) // 附加Buffer对象
);
}
// 处理读数据
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取附加的Buffer
// 13. 非阻塞读取(不会长时间阻塞!)
int bytesRead = clientChannel.read(buffer);
if (bytesRead == -1) { // 客户端关闭连接
System.out.println("客户端断开: " + clientChannel.getRemoteAddress());
clientChannel.close();
return;
}
if (bytesRead > 0) {
// 14. 切换Buffer为读模式
buffer.flip();
// 15. 处理数据(简单打印)
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data);
System.out.println("收到数据: " + message);
// 16. 准备写回响应(注册写事件)
key.interestOps(SelectionKey.OP_WRITE);
// 17. 保存响应数据(实际应用应更复杂)
key.attach(ByteBuffer.wrap(("ECHO: " + message).getBytes()));
// 18. 清空Buffer(或compact()处理半包)
buffer.clear();
}
}
// 处理写数据
private static void handleWrite(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 19. 非阻塞写入
while (buffer.hasRemaining()) {
clientChannel.write(buffer);
}
// 20. 重新注册读事件(取消写事件)
key.interestOps(SelectionKey.OP_READ);
// 21. 重置Buffer(为下次读准备)
buffer.clear();
}
}
客户端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NioClient {
public static void main(String[] args) throws IOException, InterruptedException {
// 1. 创建SocketChannel(非阻塞)
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// 2. 异步连接服务器(立即返回)
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
// 3. 等待连接完成(非阻塞方式)
while (!socketChannel.finishConnect()) {
System.out.println("连接建立中...");
Thread.sleep(300); // 模拟其他操作
}
System.out.println("连接服务器成功!");
// 4. 用户输入循环
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("输入消息: ");
String message = scanner.nextLine();
// 5. 非阻塞写入
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 6. 非阻塞读取响应
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(readBuffer);
if (bytesRead > 0) {
readBuffer.flip();
byte[] data = new byte[readBuffer.remaining()];
readBuffer.get(data);
System.out.println("收到响应: " + new String(data));
}
}
}
}
三、AIO
核心机制:
异步回调或Future机制,由操作系统完成IO操作后通知应用,无需应用线程等待。
关键组件:
AsynchronousServerSocketChannel
:异步处理连接。CompletionHandler
或Future
:处理完成事件或获取结果。
工作流程:
服务端通过
accept()
异步接收连接,立即返回。操作系统完成连接建立后,回调
CompletionHandler
。读写操作同理,数据就绪后触发回调。
四、redis中的多路复用和NIO中的多路复用
redis也使用了多路复用,但是底层
组件 | Java NIO | Redis |
---|---|---|
语言 | Java | C 语言 |
底层依赖 | JVM 封装的 epoll/kqueue |
直接调用操作系统的 epoll/kqueue |
实现级别 | JVM 用户空间 | 操作系统内核空间 |
四、总结
持续更新