Java网络编程NIO

发布于:2025-04-04 ⋅ 阅读:(31) ⋅ 点赞:(0)

一、NIO是什么?

NIO可以说是比BIO更强大的IO,可以设置非阻塞模式(通过事件的方式监听数据的到来)

  • BIO是基于socket通信,一个线程对应一个socket连接,读取数据要一直等待

  • NIO是基于channel通信,一个线程管理一个selector,一个selector管理多个channel,这样一个线程就能管理多个客户端连接

二、NIO核心组件

 ① Selector:选择器,负责管理多个通道channel,通过监听通道绑定的事件,及时处理通道数据

 ② Channel:通道,一般一端确定,另一端操作缓冲区Buffer。可以是文件通道,网络通道等

 ③ Buffer:缓冲区,包括直接内存缓冲和JVM内存缓冲。直接内存缓冲是在操作系统的非应用程序内存申请的空间,不占用JVM内存;JVM内存缓冲就是在堆空间申请的缓冲区。

   由于IO无法直接操作应用内存,因此需要将应用内存数据拷贝到直接内存,如果使用直接内存,就可以少一次拷贝操作

组件模型图:

三、常用API

(1)缓冲区Buffer

① 介绍

  position:下一次读取或写入的位置

  limit:数据末尾后一个位置

  capacitry:缓冲区容量

② 相关API

以ByteBuffer为例

//	申请操作系统的内存缓冲区
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
//	获取position、limit、capacity
int position = buffer.position();
int limit = buffer.limit();
int capacity = buffer.capacity();

//	向缓冲区中写入数据
String data = "hello, my name is jack!";
buffer.put(data.getBytes());

//	切换模式,标定界限(limit=position,position=0)
buffer.flip();

//	读取缓冲区的数据
int dataLength = buffer.remaining();	//  获取剩余数据量
String content = new String(buffer.array(), 0, dataLength);

//	清空缓冲区,实际上是将各个下标设置为初始值,如:position=0,limit=capacity
buffer.clear();

(2)通道Channel

拷贝文件的案例

public void test02() throws Exception {
        //  利用通道进行文件读写
        String fileName1 = "E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\1.jpg";
        String fileName2 = "E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\2.jpg";
        
        //  打开文件输入流
        FileInputStream fis = new FileInputStream(fileName1);
        //  通过输入流获取通道
        FileChannel fisChannel = fis.getChannel();

        //  打开文件输出流
        FileOutputStream fos = new FileOutputStream(fileName2);
        //  通过输出流获取输出通道
        FileChannel fosChannel = fos.getChannel();

        //  创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        //  将输入通道的数据读取到buffer,然后从buffer读取数据到输出通道
        while ((fisChannel.read(buffer)) > 0 ) {
            //  limit=position,position=0
            buffer.flip();
            //  将数据从buffer写入到文件
            fosChannel.write(buffer);
            //  将各个下标指针归还到最初位置
            buffer.clear();
        }
        
        //  释放通道资源
        fisChannel.close();
        fosChannel.close();
    }

 利用transferFrom实现管道数据转移

public void transferFromTest() throws Exception {
        /*
            利用transferFrom或transferTo实现文件的copy
         */
        FileInputStream fis = new FileInputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\1.jpg");
        FileChannel fisChannel = fis.getChannel();

        FileOutputStream fos = new FileOutputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\11.jpg");
        FileChannel fosChannel = fos.getChannel();

        //  将数据转移到目的通道
        fosChannel.transferFrom(fisChannel , 0, fisChannel.size());

        fisChannel.close();
        fosChannel.close();
    }

利用transferTo实现管道数据转移

public void transferToTest() throws Exception {
        /*
            利用transferFrom或transferTo实现文件的copy
         */

        FileInputStream fis = new FileInputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\1.jpg");
        FileChannel fisChannel = fis.getChannel();

        FileOutputStream fos = new FileOutputStream("E:\\code\\java\\io_learn\\io_02_nio\\src\\main\\resources\\10.jpg");
        FileChannel fosChannel = fos.getChannel();

        //  将数据转移到目的通道
        fisChannel.transferTo(0, fisChannel.size(), fosChannel);

        fisChannel.close();
        fosChannel.close();
    }

注意:transferFrom和transferTo方法一次只能传输2G的数据(受操作系统限制)

(3)选择器Selector

一般是通过服务通道监听端口,等待客户端的连接通道,然后将连接通道注册到Selector,等待事件触发通道行为

public void test() throws Exception {
        //  1.获取通道
        ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();
        //  2。设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //  3.绑定端口
        serverSocketChannel.bind(new InetSocketAddress(9898));
        //  4.获取选择器
        Selector selector = Selector.open();
        //  5.注册通道到选择器,通过客户端建立通道的事件触发
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (selector.select() > 0) {
            //  获取到对应事件,进行处理...

        }
	}

 四、NIO实现聊天室案例

服务端Server

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public static void main(String[] args) throws Exception {
        //  1.创建服务端通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //  2.设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //  3.绑定监听端口
        serverSocketChannel.bind(new InetSocketAddress(9898));
        //  4.打开Selector
        Selector selector =Selector.open();
        //  5.注册监听客户端连接的通道
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //  6.循环监听连接等事件
        while (selector.select() > 0) {
            //  获取事件迭代器
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                //  获取到具体事件
                SelectionKey key = keyIterator.next();
                //  判断事件类型
                //  客户端建立连接的事件
                if (key.isAcceptable()) {
                    //  获取到客户端的连接通道
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //  设置通道为非阻塞模式
                    socketChannel.configureBlocking(false);
                    //  将连接通道注册到选择器
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                //  如果是读取事件,证明客户端发来了消息,进行数据读取
                else if (key.isReadable()) {
                    try {
                        //  创建缓冲区
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        //  消息内容,正常聊天,消息一般很短,不会超过1kb
                        String content = "";
                        //  获取到客户端通道
                        SocketChannel channel = (SocketChannel)key.channel();
                        //  通过通道将数据读取到buffer
                        if (channel.read(byteBuffer) > 0) {
                            //  切换为可读模式
                            byteBuffer.flip();
                            //  将byte[]转为string
                            content = new String(byteBuffer.array(), 0, byteBuffer.remaining());
                            //  清空缓冲区
                            byteBuffer.clear();
                        }

                        //  打印消息内容
                        System.out.println(content);
                        //  将数据写入缓冲区
                        byteBuffer.put(content.getBytes());

                        //  将数据发送给其他所有非自己的客户端
                        Set<SelectionKey> keys = selector.keys();
                        for (SelectionKey selectionKey : keys) {
                            //  通过事件获取到客户端连接通道
                            SelectableChannel selectableChannel = selectionKey.channel();
                            //  如果通道是服务端监听连接的通道ServerSocketChannel或者是自己,则不能转发
                            if (selectableChannel instanceof ServerSocketChannel || selectableChannel == channel) {
                                continue;
                            }
                            //  将消息转发给其他客户端
                            //  转为客户端channel类型
                            SocketChannel otherClientChannel = (SocketChannel) selectableChannel;
                            //  重新设置为读模式
                            byteBuffer.flip();
                            //  发送消息
                            otherClientChannel.write(byteBuffer);
                        }

                    } catch (Exception e) {
                        //  出现异常,说明客户端下线
                        SocketChannel channel = (SocketChannel)key.channel();
                        System.out.println("客户端" + channel.getLocalAddress() + "离线");
                        channel.close();
                        //  释放通道,接触通道绑定的事件
                        key.cancel();
                    }
                }
                //  将使用完的事件进行移除
                keyIterator.remove();
            }
        }

        //  释放资源
        serverSocketChannel.close();
        selector.close();
    }

}

客户端Client

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class Client {

    public static void main(String[] args) throws IOException {
        //  1.创建socket channel
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        socketChannel.configureBlocking(false); //  设置为非阻塞模式
        //  2.开启Selector
        Selector selector = Selector.open();
        //  3.注册通道,监听通道的读事件
        socketChannel.register(selector, SelectionKey.OP_READ);

        //  开启一个接收服务端数据的线程
        new Thread(() -> {
            try {
                //  有事件,则执行处理,没有事件则进行阻塞
                while(selector.select() > 0) {
                    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                    //  遍历事件,找到可读事件,读取服务端发来的数据
                    while (keyIterator.hasNext()) {
                        SelectionKey selectionKey = keyIterator.next();
                        //  可读事件,读取服务端发来的数据
                        if (selectionKey.isReadable()) {
                            //  创建缓冲区
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            //  读取数据到缓冲区
                            socketChannel.read(buffer);
                            //  切换为读模式
                            buffer.flip();
                            System.out.println("\n" + new String(buffer.array(), 0, buffer.remaining()));
                            System.out.print("我的发言: ");
                        }
                        //  使用完毕的事件要进行移除
                        keyIterator.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();


        //  等待客户的输入,聊天控制台
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Scanner sc = new Scanner(System.in);
        while (true) {
            System.out.print("我的发言: ");
            //  读取用户输入
            String msg = socketChannel.getLocalAddress() + ": " + sc.nextLine();
            //  将消息放入缓冲区
            buffer.put(msg.getBytes());
            //  切换为写模式
            buffer.flip();
            //  发送缓冲区的数据给服务吨
            socketChannel.write(buffer);
            //  清除本次数据
            buffer.clear();
        }

    }
}