目录
🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!
🌟了解 Java NIO 请看 : NIO,看完你就懂了!
其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等
如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning
好的,各位旅客请注意,请各位旅客请注意!我们即将从 Buffer
缓冲垫站出发,前往下一站——Java NIO 的交通枢纽核心:Selector
选择器站!请系好安全带,准备体验用一个线程调度万千连接的神奇之旅!🚀
了解其他两个组件可以看:
Channel 组件:【NIO番外篇】之组件 Channel
Buffer 组件:【NIO番外篇】之组件 Buffer
一、Selector:网络世界的“机场管制塔” / “总机接线员” 📡
想象一下,你开了一家超火爆的在线服务(比如猫咪图片直播 🐱),成千上万的用户(Channels
)同时连接进来。如果按照老式做法(阻塞 IO),你得为每个连接都雇一个专属客服(线程)傻等着,用户不说话客服就摸鱼,但工资照发。很快,你的公司(服务器)就会因为人手过多(线程爆炸)而“人”满为患,效率低下,最终破产倒闭。😭
什么是 Selector?
Selector
就是 NIO 请来的 超级调度员!它允许你 用一个线程(一个敬业的管制塔操作员 🧑✈️ 或总机接线员 ☎️)来同时 监控多个通道(Channels) 的状态。哪个通道有“情况”(比如可以读数据了、可以写数据了、有新连接来了),Selector 就会告诉你。
它的作用是什么?
- I/O 多路复用 (Multiplexing):这是它的核心价值!它能让你用 一个单独的线程 管理成百上千的网络连接(Channels)。就像一个管制塔能同时监控空域里的所有飞机 ✈️,而不是每架飞机配一个专属塔台。
- 无敌的可伸缩性 (Scalability):因为它大大减少了所需的线程数量,服务器的资源消耗(内存、CPU 上下文切换开销)急剧下降。你的猫咪图片直播服务可以轻松应对海量用户,走向喵星球巅峰!📈
- 事件驱动 (Event-Driven):你的线程不再需要傻乎乎地挨个问“你好了没?你好了没?”(轮询),也不用死等一个连接(阻塞)。它只需要向 Selector 喊一声:“有情况了叫我!” (
select()
),然后就可以去泡杯咖啡 ☕️ 或者打个盹 😴 (线程等待)。当某个通道真的准备好了,Selector 就会像闹钟一样 🔔 把它叫醒。
二、Selector 的工作流程:塔台是怎么指挥飞机的?
想让 Selector 为你工作,得遵循一套“航空管制条例”:
1. 飞机就位 (准备 Channel):
- 只有
SelectableChannel
的子类(比如SocketChannel
,ServerSocketChannel
等)才能接受 Selector 的调度。 - 最最重要的一步:必须将 Channel 设置为非阻塞模式!
channel.configureBlocking(false);
。否则,管制塔的指令还没发出去,飞机自己就一头扎进云里(阻塞)了,还怎么调度?🤷♂️
2. 向塔台报到 (注册 Channel):
- 你需要告诉 Selector:“嗨,塔台,请帮我留意这架飞机(Channel)!”。使用
channel.register(selector, ops, [attachment]);
selector
: 你要注册到的那个 Selector 实例。ops
: 一个整数,表示你 关心 这架飞机(Channel)的哪些 事件(Interest Operations)。比如:SelectionKey.OP_READ
: 关心它是否可以读数据了 (飞机请求降落许可?)。SelectionKey.OP_WRITE
: 关心它是否可以写数据了 (飞机请求起飞许可?)。SelectionKey.OP_CONNECT
: 关心一个客户端SocketChannel
是否连接成功了 (飞机确认进入管制区?)。SelectionKey.OP_ACCEPT
: 关心一个服务器ServerSocketChannel
是否有新的连接进来了 (有新飞机请求进入空域?)。
你可以用位运算|
组合多个事件,比如OP_READ | OP_WRITE
。
[attachment]
(可选): 你可以给这次注册附加一个“行李牌”🏷️,比如一个包含会话信息或专属 Buffer 的对象,方便后续处理。- 返回值: 这次注册会返回一个
SelectionKey
对象,这是你的登记凭证。
3. 登记凭证 (SelectionKey):
- 每个
SelectionKey
代表了一个 Channel 在一个 Selector 上的注册记录。它像一张详细的“航班信息卡”,包含了:channel()
: 这是哪架飞机 (Channel)?selector()
: 它在哪个塔台 (Selector) 登记的?interestOps()
: 飞行员(你)关心 这架飞机哪些状态?(起飞?降落?)readyOps()
: 塔台(Selector)检测到 这架飞机 当前 已经 准备好 进行哪些操作了?(跑道已清空,可以降落!)这个状态由select()
调用更新。attachment()
: 之前附加的那个“行李牌”🏷️。
4. 塔台开始监控 (调用 select() ):
- 这是 Selector 的核心工作!你的管理线程会调用以下方法之一:
selector.select()
: 阻塞,直到至少有一个你关心的 Channel 准备好了相应的事件,或者其他线程调用了selector.wakeup()
,或者线程被中断。这是最常用的模式。(塔台操作员全神贯注盯着雷达,不等到有情况绝不离开岗位!💪)selector.select(long timeout)
: 最多阻塞timeout
毫秒。超时后即使没事件也会返回。(操作员盯一会儿,到点没情况就去喝口水 ☕️,然后再回来继续盯。)selector.selectNow()
: 非阻塞。立刻返回,不管有没有 Channel 准备好。(操作员快速扫一眼雷达,有情况就报,没情况就拉倒,继续干别的活。)
- 返回值: 这些方法返回一个整数,表示有多少个 Channel 的就绪状态发生了变化(有多少张
SelectionKey
的readyOps
被更新了)。如果是 0,表示没啥新情况。
5. 处理就绪航班 (处理 selectedKeys):
- 当
select()
返回值大于 0 时,说明有“情况”了!你需要:- 调用
selector.selectedKeys()
: 获取一个Set<SelectionKey>
,里面包含了所有当前已就绪的 Channel 的SelectionKey
。(拿到一份“需要立即处理的航班列表”🗒️。) - 遍历这个 Set 中的每一个
SelectionKey
。 - 对于每个
key
,检查它具体是哪个事件就绪了:key.isAcceptable()
: 是不是有新连接可以接受了? (新飞机来了?快引导!)key.isConnectable()
: 是不是连接成功了? (飞机已进入航线?)key.isReadable()
: 是不是可以读数据了? (乘客消息到了?快接收!)key.isWritable()
: 是不是可以写数据了? (有指令要发给飞机?快发送!)
- 根据就绪的事件,执行相应的 非阻塞 I/O 操作(比如
serverChannel.accept()
,socketChannel.read()
,socketChannel.write()
)。 - 最最最最关键的一步 (敲黑板 N 遍 칠판!!!):处理完一个
SelectionKey
后,必须手动将它从selectedKeys
集合中移除!iterator.remove();
或者selector.selectedKeys().remove(key);
。如果你不移除,下次select()
时它还会被包含在内,即使你已经处理过了,会导致你的程序像复读机一样重复处理同一个事件,造成死循环或逻辑混乱! (塔台处理完一架飞机的指令,必须把它从“待办”列表里划掉 ✅,否则会一直骚扰它!)
- 调用
6. 日复一日,年复一年 (循环):
- 上述的
select()
-> 获取selectedKeys
-> 遍历处理 -> 移除key
的过程,通常放在一个while(true)
循环里,让你的单线程调度员持续不断地为人民服务。
上代码!模拟一个简单的回声服务器 📢
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class SelectorEchoServer {
private static final int PORT = 9999;
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) {
System.out.println("启动 Selector 回声服务器在端口 " + PORT + " 📡");
try (Selector selector = Selector.open(); // 打开“塔台”
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) // 打开服务器“机场”
{
serverSocketChannel.bind(new InetSocketAddress(PORT)); // 机场绑定地址和端口
serverSocketChannel.configureBlocking(false); // 关键:机场设置为非阻塞模式!
// 向塔台注册机场,只关心“新飞机抵达”事件 (OP_ACCEPT)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器机场已向塔台报到,等待新连接...");
while (true) { // 开始塔台的 7x24 小时值班
// 1. 等待事件发生(阻塞直到有情况)
int readyChannels = selector.select(); // 等待雷达信号...
if (readyChannels == 0) {
// System.out.println("暂时无事发生..."); // select() 可能因 wakeup() 或超时返回0
continue; // 继续下一轮监控
}
// 2. 获取就绪事件的列表 (拿到需要处理的航班列表)
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
// 3. 遍历处理每个就绪事件
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
try {
// --- 情况 A:有新飞机抵达 (isAcceptable) ---
if (key.isAcceptable()) {
System.out.println("🔔 检测到新连接请求!");
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept(); // 接受连接,得到代表客户端的通道
clientChannel.configureBlocking(false); // 关键:新飞机也设置为非阻塞!
// 将新客户端通道注册到同一个塔台,关心“可以读数据”事件
// 并附加一个 ByteBuffer 作为它的专属“货舱”
clientChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(BUFFER_SIZE));
System.out.println("✅ 新客户端已连接并注册: " + clientChannel.getRemoteAddress());
}
// --- 情况 B:已连接的飞机发来消息 (isReadable) ---
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取附加的“货舱”
try {
int bytesRead = clientChannel.read(buffer); // 从通道读数据到 Buffer
if (bytesRead == -1) {
// 客户端关闭了连接
System.out.println("❌ 客户端断开连接: " + clientChannel.getRemoteAddress());
key.cancel(); // 取消在塔台的注册
clientChannel.close(); // 关闭通道
} else if (bytesRead > 0) {
System.out.println("📨 收到来自 " + clientChannel.getRemoteAddress() + " 的消息 (" + bytesRead + " bytes)");
// 准备回写数据:将 Buffer 从写模式切换到读模式
buffer.flip();
// 将兴趣切换为 OP_WRITE,告诉塔台“我想发消息了”
// 注意:这里直接修改 key 的 interestOps
key.interestOps(SelectionKey.OP_WRITE);
}
} catch (IOException e) {
// 客户端强制关闭等异常
System.err.println("⚠️ 读数据时连接异常: " + clientChannel.getRemoteAddress() + " - " + e.getMessage());
key.cancel();
clientChannel.close();
}
}
// --- 情况 C:可以向飞机发送消息了 (isWritable) ---
// 注意:通常 OP_WRITE 会在缓冲区有空间时一直触发,需要小心处理
if (key.isWritable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取“货舱”
try {
System.out.println("📤 准备向 " + clientChannel.getRemoteAddress() + " 回写数据...");
clientChannel.write(buffer); // 将 Buffer 中的数据写回给客户端
if (!buffer.hasRemaining()) {
// 数据全部写完了!
System.out.println("✅ 数据已全部回写给 " + clientChannel.getRemoteAddress());
// 清空 Buffer,准备下次读取
buffer.clear();
// 将兴趣切换回 OP_READ,告诉塔台“我又可以收消息了”
key.interestOps(SelectionKey.OP_READ);
}
} catch (IOException e) {
System.err.println("⚠️ 写数据时连接异常: " + clientChannel.getRemoteAddress() + " - " + e.getMessage());
key.cancel();
clientChannel.close();
}
}
} catch (IOException e) {
System.err.println("处理 Key 时发生 IO 异常: " + e.getMessage());
// 尝试关闭出问题的 channel
if (key.channel() instanceof SocketChannel) {
try { ((SocketChannel)key.channel()).close();} catch (IOException ioex) {/* ignore */}
}
key.cancel(); // 从 selector 中移除
} finally {
// 4. 关键!关键!关键!处理完后从 selectedKeys 集合中移除当前 Key
keyIterator.remove(); // 划掉!搞定一个!✅
}
} // end of while(keyIterator.hasNext())
} // end of while(true)
} catch (IOException e) {
System.err.println("服务器启动或运行出错: " + e.getMessage());
e.printStackTrace();
}
}
}
运行这个代码,你可以用 telnet localhost 9999
连接上去,输入任何信息,服务器都会原样返回给你!这就是 Selector 的魔力,一个线程处理所有连接!
三、Selector 的常用“指令”小结:
Selector.open()
: 开门营业!建个新塔台。channel.configureBlocking(false)
: 飞机听指挥!必须非阻塞。channel.register(selector, ops, [attachment])
: 登记!告诉塔台你的需求。selector.select() / select(timeout) / selectNow()
: 监听!等待事件发生。selector.selectedKeys()
: 报告!获取就绪事件列表。key.isReadable/Writable/Acceptable/Connectable()
: 识别!判断具体是什么事件。key.channel() / key.attachment()
: 关联!获取对应的飞机和行李牌。key.interestOps(ops)
: 更新!改变你关心的事件。keyIterator.remove()
: 销账!处理完必须移除!(再说亿遍)key.cancel()
: 退役!从塔台注销。selector.wakeup()
: 叫醒!强制select()
返回。selector.close()
: 关门!关闭塔台。
Selector 的智慧:
用最少的人(线程),办最多的事(管理海量连接),这就是 Selector 的核心哲学——I/O 多路复用。掌握了它,你就掌握了构建高性能、高并发网络应用的钥匙🔑!
希望这个机场塔台的故事,能让你对 Selector 这个 NIO 的大总管有更生动、更深刻的理解!现在,你可以自豪地说:“Selector,你的调度逻辑我 get 到了!” 😎