https://www.bilibili.com/video/BV1paLfz8EUA/
https://github.com/implement-study/netty-study
NIO流程
BioClient.java
package org.example.bio;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
*
* BioClient 类,基于 BIO(阻塞 I/O)模型,通过多线程向服务器发送数据
*/
public class BioClient {
public static void main(String[] args) throws Exception {
// 创建名为 "tom" 的线程,用于执行 sendHello 方法
Thread tom = new Thread(() -> {
try {
sendHello();
} catch (Exception e) {
// 捕获异常并包装成运行时异常抛出
throw new RuntimeException(e);
}
}, "tom");
// 创建名为 "jerry" 的线程,用于执行 sendHello 方法
Thread jerry = new Thread(() -> {
try {
sendHello();
} catch (Exception e) {
// 捕获异常并包装成运行时异常抛出
throw new RuntimeException(e);
}
}, "jerry");
// 启动 tom 线程
tom.start();
// 启动 jerry 线程
jerry.start();
// 等待 tom 线程执行完毕,再继续执行后续代码
tom.join();
// 等待 jerry 线程执行完毕,再继续执行后续代码
jerry.join();
}
/**
* 向服务器发送数据的方法,会连接到本地 8080 端口的服务器,
* 循环发送包含当前线程名称和序号的消息
* @throws Exception 可能抛出的 IO 异常、 InterruptedException 等异常
*/
private static void sendHello() throws Exception {
// 创建 Socket 对象,用于建立网络连接
Socket socket = new Socket();
// 连接到本地(localhost)的 8080 端口
socket.connect(new InetSocketAddress("localhost", 8080));
// 获取 Socket 的输出流,用于向服务器发送数据
OutputStream outputStream = socket.getOutputStream();
// 循环 10 次发送数据
for (int i = 0; i < 10; i++) {
// 构造要发送的内容,包含当前线程名称、"hello " 及序号,转换为字节数组后发送
outputStream.write((Thread.currentThread().getName() + "hello " + i + "\n").getBytes());
// 强制将输出流缓冲区的数据立即发送出去
outputStream.flush();
}
// 线程休眠 10000 毫秒(10 秒),可根据实际需求调整或移除
Thread.sleep(10000);
}
}
BioServer.java
package org.example.bio;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BioServer 类,用于创建一个基于 BIO(阻塞 I/O)的简单服务器,
* 监听指定端口,接受客户端连接并读取客户端发送的数据
*/
public class BioServer {
public static void main(String[] args) throws Exception {
// 创建 ServerSocket,监听 8080 端口,用于接受客户端的连接请求
ServerSocket socket = new ServerSocket(8080);
while(true){
// 调用 accept 方法,阻塞等待客户端连接,当有客户端连接时,返回对应的 Socket 对象
Socket accept = socket.accept();
// 通过 Socket 获取输入流,用于读取客户端发送过来的数据
InputStream inputStream = accept.getInputStream();
// 创建一个字节数组作为缓冲区,用于存储读取到的数据,缓冲区大小为 1024 字节
byte[] buffer = new byte[1024];
// 用于存储每次读取到的字节数
int length;
// 循环读取输入流中的数据,当 read 方法返回 -1 时,表示数据读取完毕(客户端关闭连接等情况)
while ((length = inputStream.read(buffer)) != -1) {
// 将字节数组中从 0 开始,长度为 length 的部分转换为字符串,也就是客户端发送的消息内容
String message = new String(buffer, 0, length);
// 在控制台打印接收到的客户端消息
System.out.println(message);
}
// 当循环结束(客户端断开连接),打印提示信息
System.out.println("客户端断开连接");
}
}
}
中间等待了10s
NioServer.java
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* NIO服务器示例
* 基于Java NIO(New IO)实现的非阻塞服务器
* 支持同时处理多个客户端连接,使用Selector实现IO多路复用
*
* @author gongxuanzhangmelt@gmail.com
**/
public class NioServer {
public static void main(String[] args) throws Exception {
// 1. 创建Selector(选择器),用于监听多个通道的IO事件
// Selector是NIO的核心组件,实现了IO多路复用
Selector selector = Selector.open();
// 2. 创建服务器通道(ServerSocketChannel)
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 3. 设置通道为非阻塞模式
// 这是NIO区别于传统IO的关键特性
serverChannel.configureBlocking(false);
// 4. 绑定服务器端口(8080)
serverChannel.bind(new InetSocketAddress(8080));
System.out.println("服务器已启动,监听端口: 8080");
// 5. 向Selector注册服务器通道,并关注"接受连接"事件(OP_ACCEPT)
// 注册后,Selector会监听该通道上的连接请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 服务器主循环,持续处理IO事件
while (true) {
// 阻塞等待,直到至少有一个通道准备好进行IO操作
// 返回值为就绪的通道数量
selector.select();
// 获取所有就绪通道的SelectionKey集合的迭代器
// SelectionKey包含了通道和事件的信息
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍历处理每个就绪的事件
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 移除已处理的SelectionKey,避免重复处理
iterator.remove();
// 7. 处理"接受连接"事件
if (key.isAcceptable()) {
// 从SelectionKey中获取对应的服务器通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 验证获取的通道是否为我们创建的服务器通道(调试用)
System.out.println("接受连接的通道是否为服务器通道: " + (ssc == serverChannel));
// 接受客户端连接,返回客户端通道
// 因为设置了非阻塞模式,这里不会阻塞
SocketChannel client = ssc.accept();
System.out.println("新客户端连接: " + client.getRemoteAddress());
// 将客户端通道设置为非阻塞模式
client.configureBlocking(false);
// 向Selector注册客户端通道,并关注"读"事件(OP_READ)
// 当客户端发送数据时,Selector会通知我们
client.register(selector, SelectionKey.OP_READ);
}
// 8. 处理"读"事件(客户端发送数据)
if (key.isReadable()) {
// 从SelectionKey中获取对应的客户端通道
SocketChannel client = (SocketChannel) key.channel();
// 创建缓冲区,用于读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取客户端发送的数据
// 返回值:-1表示客户端断开连接,其他值表示读取的字节数
int readBytes = client.read(buffer);
if (readBytes == -1) {
// 客户端断开连接
System.out.println(client.getRemoteAddress() + " 断开连接了");
client.close(); // 关闭通道
} else {
// 切换缓冲区为读模式(limit设置为position,position重置为0)
buffer.flip();
// 创建与缓冲区中数据长度相同的字节数组
byte[] bytes = new byte[buffer.remaining()];
// 将缓冲区中的数据读取到字节数组中
buffer.get(bytes);
// 将字节数组转换为字符串并打印
System.out.println("收到来自 " + client.getRemoteAddress() + " 的数据: " + new String(bytes));
// 清空缓冲区,准备下次读取
buffer.clear();
}
}
}
}
}
}
中间没有等待
Selector(选择器):NIO 的核心组件,用于监听多个通道的事件,实现单线程处理多连接
Channel(通道):分为服务器通道(ServerSocketChannel)和客户端通道(SocketChannel)
非阻塞模式:通过configureBlocking(false)设置,使 IO 操作不会阻塞线程
SelectionKey:保存通道和事件的关联信息,用于识别就绪的事件类型
ByteBuffer(缓冲区):NIO 中用于数据读写的容器,需要关注其读写模式的切换
这个 NIO 服务器实现了基本的功能:接受客户端连接并读取客户端发送的数据,相比传统的 BIO(阻塞 IO)模型,它可以用一个线程高效地处理多个客户端连接。
半包和粘包
将buffer改成4,出现半包现象。
因为buffer设置的太小,一次最多读4字节,一次不能完整的读完一句话
// 创建缓冲区,用于读取数据
ByteBuffer buffer = ByteBuffer.allocate(4);
将buffer改成16,出现粘包现象
一次读取,读到了下一条消息
// 创建缓冲区,用于读取数据
ByteBuffer buffer = ByteBuffer.allocate(16);
半包和粘包是tcp面向流而导致的问题,因为我们没有办法控制真正的流读取出多少字节。我们在读buffer时候,并不能保证每次读取到的数据大小是多少,即使设定完美也可能因为网络阻塞等原因,导致每次读取数据的长度不可控。
因此我们需要在客户端和服务端之间制定一些规则(协议),通过这些规则来保证发送消息的双端都知道如何获取一条完整的消息