一、NIO是什么?
NIO可以说是比BIO更强大的IO,可以设置非阻塞模式(通过事件的方式监听数据的到来)
BIO是基于socket通信,一个线程对应一个socket连接,读取数据要一直等待
NIO是基于channel通信,一个线程管理一个selector,一个selector管理多个channel,这样一个线程就能管理多个客户端连接
二、NIO核心组件
① Selector:选择器,负责管理多个通道channel,通过监听通道绑定的事件,及时处理通道数据
② Channel:通道,一般一端确定,另一端操作缓冲区Buffer。可以是文件通道,网络通道等
③ Buffer:缓冲区,包括直接内存缓冲和JVM内存缓冲。直接内存缓冲是在操作系统的非应用程序内存申请的空间,不占用JVM内存;JVM内存缓冲就是在堆空间申请的缓冲区。
由于IO无法直接操作应用内存,因此需要将应用内存数据拷贝到直接内存,如果使用直接内存,就可以少一次拷贝操作
组件模型图:
三、常用API
(1)缓冲区Buffer
① 介绍
position:下一次读取或写入的位置
limit:数据末尾后一个位置
capacitry:缓冲区容量
② 相关API
以ByteBuffer为例
// 申请操作系统的内存缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// 获取position、limit、capacity
int position = buffer.position();
int limit = buffer.limit();
int capacity = buffer.capacity();
// 向缓冲区中写入数据
String data = "hello, my name is jack!";
buffer.put(data.getBytes());
// 切换模式,标定界限(limit=position,position=0)
buffer.flip();
// 读取缓冲区的数据
int dataLength = buffer.remaining(); // 获取剩余数据量
String content = new String(buffer.array(), 0, dataLength);
// 清空缓冲区,实际上是将各个下标设置为初始值,如:position=0,limit=capacity
buffer.clear();
(2)通道Channel
拷贝文件的案例
public void test02() throws Exception {
// 利用通道进行文件读写
String fileName1 = "E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\1.jpg";
String fileName2 = "E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\2.jpg";
// 打开文件输入流
FileInputStream fis = new FileInputStream(fileName1);
// 通过输入流获取通道
FileChannel fisChannel = fis.getChannel();
// 打开文件输出流
FileOutputStream fos = new FileOutputStream(fileName2);
// 通过输出流获取输出通道
FileChannel fosChannel = fos.getChannel();
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 将输入通道的数据读取到buffer,然后从buffer读取数据到输出通道
while ((fisChannel.read(buffer)) > 0 ) {
// limit=position,position=0
buffer.flip();
// 将数据从buffer写入到文件
fosChannel.write(buffer);
// 将各个下标指针归还到最初位置
buffer.clear();
}
// 释放通道资源
fisChannel.close();
fosChannel.close();
}
利用transferFrom实现管道数据转移
public void transferFromTest() throws Exception {
/*
利用transferFrom或transferTo实现文件的copy
*/
FileInputStream fis = new FileInputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\1.jpg");
FileChannel fisChannel = fis.getChannel();
FileOutputStream fos = new FileOutputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\11.jpg");
FileChannel fosChannel = fos.getChannel();
// 将数据转移到目的通道
fosChannel.transferFrom(fisChannel , 0, fisChannel.size());
fisChannel.close();
fosChannel.close();
}
利用transferTo实现管道数据转移
public void transferToTest() throws Exception {
/*
利用transferFrom或transferTo实现文件的copy
*/
FileInputStream fis = new FileInputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\1.jpg");
FileChannel fisChannel = fis.getChannel();
FileOutputStream fos = new FileOutputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\10.jpg");
FileChannel fosChannel = fos.getChannel();
// 将数据转移到目的通道
fisChannel.transferTo(0, fisChannel.size(), fosChannel);
fisChannel.close();
fosChannel.close();
}
注意:transferFrom和transferTo方法一次只能传输2G的数据(受操作系统限制)
(3)选择器Selector
一般是通过服务通道监听端口,等待客户端的连接通道,然后将连接通道注册到Selector,等待事件触发通道行为
public void test() throws Exception {
// 1.获取通道
ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();
// 2。设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3.绑定端口
serverSocketChannel.bind(new InetSocketAddress(9898));
// 4.获取选择器
Selector selector = Selector.open();
// 5.注册通道到选择器,通过客户端建立通道的事件触发
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
// 获取到对应事件,进行处理...
}
}
四、NIO实现聊天室案例
服务端Server
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) throws Exception {
// 1.创建服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2.设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3.绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(9898));
// 4.打开Selector
Selector selector =Selector.open();
// 5.注册监听客户端连接的通道
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6.循环监听连接等事件
while (selector.select() > 0) {
// 获取事件迭代器
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
// 获取到具体事件
SelectionKey key = keyIterator.next();
// 判断事件类型
// 客户端建立连接的事件
if (key.isAcceptable()) {
// 获取到客户端的连接通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 设置通道为非阻塞模式
socketChannel.configureBlocking(false);
// 将连接通道注册到选择器
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 如果是读取事件,证明客户端发来了消息,进行数据读取
else if (key.isReadable()) {
try {
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 消息内容,正常聊天,消息一般很短,不会超过1kb
String content = "";
// 获取到客户端通道
SocketChannel channel = (SocketChannel)key.channel();
// 通过通道将数据读取到buffer
if (channel.read(byteBuffer) > 0) {
// 切换为可读模式
byteBuffer.flip();
// 将byte[]转为string
content = new String(byteBuffer.array(), 0, byteBuffer.remaining());
// 清空缓冲区
byteBuffer.clear();
}
// 打印消息内容
System.out.println(content);
// 将数据写入缓冲区
byteBuffer.put(content.getBytes());
// 将数据发送给其他所有非自己的客户端
Set<SelectionKey> keys = selector.keys();
for (SelectionKey selectionKey : keys) {
// 通过事件获取到客户端连接通道
SelectableChannel selectableChannel = selectionKey.channel();
// 如果通道是服务端监听连接的通道ServerSocketChannel或者是自己,则不能转发
if (selectableChannel instanceof ServerSocketChannel || selectableChannel == channel) {
continue;
}
// 将消息转发给其他客户端
// 转为客户端channel类型
SocketChannel otherClientChannel = (SocketChannel) selectableChannel;
// 重新设置为读模式
byteBuffer.flip();
// 发送消息
otherClientChannel.write(byteBuffer);
}
} catch (Exception e) {
// 出现异常,说明客户端下线
SocketChannel channel = (SocketChannel)key.channel();
System.out.println("客户端" + channel.getLocalAddress() + "离线");
channel.close();
// 释放通道,接触通道绑定的事件
key.cancel();
}
}
// 将使用完的事件进行移除
keyIterator.remove();
}
}
// 释放资源
serverSocketChannel.close();
selector.close();
}
}
客户端Client
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class Client {
public static void main(String[] args) throws IOException {
// 1.创建socket channel
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
socketChannel.configureBlocking(false); // 设置为非阻塞模式
// 2.开启Selector
Selector selector = Selector.open();
// 3.注册通道,监听通道的读事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 开启一个接收服务端数据的线程
new Thread(() -> {
try {
// 有事件,则执行处理,没有事件则进行阻塞
while(selector.select() > 0) {
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
// 遍历事件,找到可读事件,读取服务端发来的数据
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
// 可读事件,读取服务端发来的数据
if (selectionKey.isReadable()) {
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取数据到缓冲区
socketChannel.read(buffer);
// 切换为读模式
buffer.flip();
System.out.println("\n" + new String(buffer.array(), 0, buffer.remaining()));
System.out.print("我的发言: ");
}
// 使用完毕的事件要进行移除
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// 等待客户的输入,聊天控制台
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print("我的发言: ");
// 读取用户输入
String msg = socketChannel.getLocalAddress() + ": " + sc.nextLine();
// 将消息放入缓冲区
buffer.put(msg.getBytes());
// 切换为写模式
buffer.flip();
// 发送缓冲区的数据给服务吨
socketChannel.write(buffer);
// 清除本次数据
buffer.clear();
}
}
}