【NIO番外篇】之组件 Selector

发布于:2025-04-15 ⋅ 阅读:(19) ⋅ 点赞:(0)

在这里插入图片描述

🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!

🌟了解 Java NIO 请看 : NIO,看完你就懂了!

其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏已完结)】…等

如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning

好的,各位旅客请注意,请各位旅客请注意!我们即将从 Buffer 缓冲垫站出发,前往下一站——Java NIO 的交通枢纽核心:Selector 选择器站!请系好安全带,准备体验用一个线程调度万千连接的神奇之旅!🚀

了解其他两个组件可以看:
Channel 组件:【NIO番外篇】之组件 Channel
Buffer 组件:【NIO番外篇】之组件 Buffer

一、Selector:网络世界的“机场管制塔” / “总机接线员” 📡

想象一下,你开了一家超火爆的在线服务(比如猫咪图片直播 🐱),成千上万的用户(Channels)同时连接进来。如果按照老式做法(阻塞 IO),你得为每个连接都雇一个专属客服(线程)傻等着,用户不说话客服就摸鱼,但工资照发。很快,你的公司(服务器)就会因为人手过多(线程爆炸)而“人”满为患,效率低下,最终破产倒闭。😭

什么是 Selector?

Selector 就是 NIO 请来的 超级调度员!它允许你 用一个线程(一个敬业的管制塔操作员 🧑‍✈️ 或总机接线员 ☎️)来同时 监控多个通道(Channels) 的状态。哪个通道有“情况”(比如可以读数据了、可以写数据了、有新连接来了),Selector 就会告诉你。
在这里插入图片描述

它的作用是什么?

  1. I/O 多路复用 (Multiplexing):这是它的核心价值!它能让你用 一个单独的线程 管理成百上千的网络连接(Channels)。就像一个管制塔能同时监控空域里的所有飞机 ✈️,而不是每架飞机配一个专属塔台。
  2. 无敌的可伸缩性 (Scalability):因为它大大减少了所需的线程数量,服务器的资源消耗(内存、CPU 上下文切换开销)急剧下降。你的猫咪图片直播服务可以轻松应对海量用户,走向喵星球巅峰!📈
  3. 事件驱动 (Event-Driven):你的线程不再需要傻乎乎地挨个问“你好了没?你好了没?”(轮询),也不用死等一个连接(阻塞)。它只需要向 Selector 喊一声:“有情况了叫我!” (select()),然后就可以去泡杯咖啡 ☕️ 或者打个盹 😴 (线程等待)。当某个通道真的准备好了,Selector 就会像闹钟一样 🔔 把它叫醒。

二、Selector 的工作流程:塔台是怎么指挥飞机的?

想让 Selector 为你工作,得遵循一套“航空管制条例”:

1. 飞机就位 (准备 Channel):

  • 只有 SelectableChannel 的子类(比如 SocketChannel, ServerSocketChannel 等)才能接受 Selector 的调度。
  • 最最重要的一步:必须将 Channel 设置为非阻塞模式channel.configureBlocking(false);。否则,管制塔的指令还没发出去,飞机自己就一头扎进云里(阻塞)了,还怎么调度?🤷‍♂️

2. 向塔台报到 (注册 Channel):

  • 你需要告诉 Selector:“嗨,塔台,请帮我留意这架飞机(Channel)!”。使用 channel.register(selector, ops, [attachment]);
  • selector: 你要注册到的那个 Selector 实例。
  • ops: 一个整数,表示你 关心 这架飞机(Channel)的哪些 事件(Interest Operations)。比如:
    • SelectionKey.OP_READ: 关心它是否可以数据了 (飞机请求降落许可?)。
    • SelectionKey.OP_WRITE: 关心它是否可以数据了 (飞机请求起飞许可?)。
    • SelectionKey.OP_CONNECT: 关心一个客户端 SocketChannel 是否连接成功了 (飞机确认进入管制区?)。
    • SelectionKey.OP_ACCEPT: 关心一个服务器 ServerSocketChannel 是否有新的连接进来了 (有新飞机请求进入空域?)。
      你可以用位运算 | 组合多个事件,比如 OP_READ | OP_WRITE
  • [attachment] (可选): 你可以给这次注册附加一个“行李牌”🏷️,比如一个包含会话信息或专属 Buffer 的对象,方便后续处理。
  • 返回值: 这次注册会返回一个 SelectionKey 对象,这是你的登记凭证。

3. 登记凭证 (SelectionKey):

  • 每个 SelectionKey 代表了一个 Channel 在一个 Selector 上的注册记录。它像一张详细的“航班信息卡”,包含了:
    • channel(): 这是哪架飞机 (Channel)?
    • selector(): 它在哪个塔台 (Selector) 登记的?
    • interestOps(): 飞行员(你)关心 这架飞机哪些状态?(起飞?降落?)
    • readyOps(): 塔台(Selector)检测到 这架飞机 当前 已经 准备好 进行哪些操作了?(跑道已清空,可以降落!)这个状态由 select() 调用更新。
    • attachment(): 之前附加的那个“行李牌”🏷️。

4. 塔台开始监控 (调用 select() ):

  • 这是 Selector 的核心工作!你的管理线程会调用以下方法之一:
    • selector.select(): 阻塞,直到至少有一个你关心的 Channel 准备好了相应的事件,或者其他线程调用了 selector.wakeup(),或者线程被中断。这是最常用的模式。(塔台操作员全神贯注盯着雷达,不等到有情况绝不离开岗位!💪)
    • selector.select(long timeout): 最多阻塞 timeout 毫秒。超时后即使没事件也会返回。(操作员盯一会儿,到点没情况就去喝口水 ☕️,然后再回来继续盯。)
    • selector.selectNow(): 非阻塞。立刻返回,不管有没有 Channel 准备好。(操作员快速扫一眼雷达,有情况就报,没情况就拉倒,继续干别的活。)
  • 返回值: 这些方法返回一个整数,表示有多少个 Channel就绪状态发生了变化(有多少张 SelectionKeyreadyOps 被更新了)。如果是 0,表示没啥新情况。

5. 处理就绪航班 (处理 selectedKeys):

  • select() 返回值大于 0 时,说明有“情况”了!你需要:
    • 调用 selector.selectedKeys(): 获取一个 Set<SelectionKey>,里面包含了所有当前已就绪的 Channel 的 SelectionKey。(拿到一份“需要立即处理的航班列表”🗒️。)
    • 遍历这个 Set 中的每一个 SelectionKey
    • 对于每个 key,检查它具体是哪个事件就绪了:
      • key.isAcceptable(): 是不是有新连接可以接受了? (新飞机来了?快引导!)
      • key.isConnectable(): 是不是连接成功了? (飞机已进入航线?)
      • key.isReadable(): 是不是可以读数据了? (乘客消息到了?快接收!)
      • key.isWritable(): 是不是可以写数据了? (有指令要发给飞机?快发送!)
    • 根据就绪的事件,执行相应的 非阻塞 I/O 操作(比如 serverChannel.accept(), socketChannel.read(), socketChannel.write())。
    • 最最最最关键的一步 (敲黑板 N 遍 칠판!!!)处理完一个 SelectionKey 后,必须手动将它从 selectedKeys 集合中移除! iterator.remove(); 或者 selector.selectedKeys().remove(key);如果你不移除,下次 select() 时它还会被包含在内,即使你已经处理过了,会导致你的程序像复读机一样重复处理同一个事件,造成死循环或逻辑混乱! (塔台处理完一架飞机的指令,必须把它从“待办”列表里划掉 ✅,否则会一直骚扰它!)

6. 日复一日,年复一年 (循环):

  • 上述的 select() -> 获取 selectedKeys -> 遍历处理 -> 移除 key 的过程,通常放在一个 while(true) 循环里,让你的单线程调度员持续不断地为人民服务。

上代码!模拟一个简单的回声服务器 📢

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class SelectorEchoServer {

    private static final int PORT = 9999;
    private static final int BUFFER_SIZE = 1024;

    public static void main(String[] args) {
        System.out.println("启动 Selector 回声服务器在端口 " + PORT + " 📡");

        try (Selector selector = Selector.open(); // 打开“塔台”
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) // 打开服务器“机场”
        {

            serverSocketChannel.bind(new InetSocketAddress(PORT)); // 机场绑定地址和端口
            serverSocketChannel.configureBlocking(false); // 关键:机场设置为非阻塞模式!

            // 向塔台注册机场,只关心“新飞机抵达”事件 (OP_ACCEPT)
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器机场已向塔台报到,等待新连接...");

            while (true) { // 开始塔台的 7x24 小时值班
                // 1. 等待事件发生(阻塞直到有情况)
                int readyChannels = selector.select(); // 等待雷达信号...

                if (readyChannels == 0) {
                    // System.out.println("暂时无事发生..."); // select() 可能因 wakeup() 或超时返回0
                    continue; // 继续下一轮监控
                }

                // 2. 获取就绪事件的列表 (拿到需要处理的航班列表)
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

                // 3. 遍历处理每个就绪事件
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();

                    try {
                        // --- 情况 A:有新飞机抵达 (isAcceptable) ---
                        if (key.isAcceptable()) {
                            System.out.println("🔔 检测到新连接请求!");
                            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                            SocketChannel clientChannel = serverChannel.accept(); // 接受连接,得到代表客户端的通道
                            clientChannel.configureBlocking(false); // 关键:新飞机也设置为非阻塞!

                            // 将新客户端通道注册到同一个塔台,关心“可以读数据”事件
                            // 并附加一个 ByteBuffer 作为它的专属“货舱”
                            clientChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(BUFFER_SIZE));
                            System.out.println("✅ 新客户端已连接并注册: " + clientChannel.getRemoteAddress());
                        }

                        // --- 情况 B:已连接的飞机发来消息 (isReadable) ---
                        if (key.isReadable()) {
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取附加的“货舱”

                            try {
                                int bytesRead = clientChannel.read(buffer); // 从通道读数据到 Buffer

                                if (bytesRead == -1) {
                                    // 客户端关闭了连接
                                    System.out.println("❌ 客户端断开连接: " + clientChannel.getRemoteAddress());
                                    key.cancel(); // 取消在塔台的注册
                                    clientChannel.close(); // 关闭通道
                                } else if (bytesRead > 0) {
                                    System.out.println("📨 收到来自 " + clientChannel.getRemoteAddress() + " 的消息 (" + bytesRead + " bytes)");
                                    // 准备回写数据:将 Buffer 从写模式切换到读模式
                                    buffer.flip();
                                    // 将兴趣切换为 OP_WRITE,告诉塔台“我想发消息了”
                                    // 注意:这里直接修改 key 的 interestOps
                                    key.interestOps(SelectionKey.OP_WRITE);
                                }
                            } catch (IOException e) {
                                // 客户端强制关闭等异常
                                System.err.println("⚠️ 读数据时连接异常: " + clientChannel.getRemoteAddress() + " - " + e.getMessage());
                                key.cancel();
                                clientChannel.close();
                            }
                        }

                        // --- 情况 C:可以向飞机发送消息了 (isWritable) ---
                        // 注意:通常 OP_WRITE 会在缓冲区有空间时一直触发,需要小心处理
                        if (key.isWritable()) {
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取“货舱”

                            try {
                                System.out.println("📤 准备向 " + clientChannel.getRemoteAddress() + " 回写数据...");
                                clientChannel.write(buffer); // 将 Buffer 中的数据写回给客户端

                                if (!buffer.hasRemaining()) {
                                    // 数据全部写完了!
                                    System.out.println("✅ 数据已全部回写给 " + clientChannel.getRemoteAddress());
                                    // 清空 Buffer,准备下次读取
                                    buffer.clear();
                                    // 将兴趣切换回 OP_READ,告诉塔台“我又可以收消息了”
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            } catch (IOException e) {
                                System.err.println("⚠️ 写数据时连接异常: " + clientChannel.getRemoteAddress() + " - " + e.getMessage());
                                key.cancel();
                                clientChannel.close();
                            }
                        }
                    } catch (IOException e) {
                        System.err.println("处理 Key 时发生 IO 异常: " + e.getMessage());
                        // 尝试关闭出问题的 channel
                        if (key.channel() instanceof SocketChannel) {
                           try { ((SocketChannel)key.channel()).close();} catch (IOException ioex) {/* ignore */}
                        }
                        key.cancel(); // 从 selector 中移除
                    } finally {
                        // 4. 关键!关键!关键!处理完后从 selectedKeys 集合中移除当前 Key
                        keyIterator.remove(); // 划掉!搞定一个!✅
                    }
                } // end of while(keyIterator.hasNext())
            } // end of while(true)

        } catch (IOException e) {
            System.err.println("服务器启动或运行出错: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

运行这个代码,你可以用 telnet localhost 9999 连接上去,输入任何信息,服务器都会原样返回给你!这就是 Selector 的魔力,一个线程处理所有连接!

三、Selector 的常用“指令”小结:

  • Selector.open(): 开门营业!建个新塔台。
  • channel.configureBlocking(false): 飞机听指挥!必须非阻塞。
  • channel.register(selector, ops, [attachment]): 登记!告诉塔台你的需求。
  • selector.select() / select(timeout) / selectNow(): 监听!等待事件发生。
  • selector.selectedKeys(): 报告!获取就绪事件列表。
  • key.isReadable/Writable/Acceptable/Connectable(): 识别!判断具体是什么事件。
  • key.channel() / key.attachment(): 关联!获取对应的飞机和行李牌。
  • key.interestOps(ops): 更新!改变你关心的事件。
  • keyIterator.remove(): 销账!处理完必须移除!(再说亿遍)
  • key.cancel(): 退役!从塔台注销。
  • selector.wakeup(): 叫醒!强制 select() 返回。
  • selector.close(): 关门!关闭塔台。

Selector 的智慧:

用最少的人(线程),办最多的事(管理海量连接),这就是 Selector 的核心哲学——I/O 多路复用。掌握了它,你就掌握了构建高性能、高并发网络应用的钥匙🔑!

希望这个机场塔台的故事,能让你对 Selector 这个 NIO 的大总管有更生动、更深刻的理解!现在,你可以自豪地说:“Selector,你的调度逻辑我 get 到了!” 😎


网站公告

今日签到

点亮在社区的每一天
去签到