1. 字段解析
NioEventLoop
类是 Netty 4.1 中实现 Reactor 模型的单线程事件循环类,继承自 SingleThreadEventLoop
,负责处理 I/O 事件、任务调度和事件分发。以下是类中定义的所有字段,逐一解析其作用和实现细节。
1.1 CLEANUP_INTERVAL
定义:
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
初始化:硬编码为 256。
作用:
- 定义清理已取消
SelectionKey
的间隔。 - 在
cancel(SelectionKey)
方法中,当cancelledKeys
达到 256 时,设置needsToSelectAgain = true
,触发selectAgain()
清理Selector
。
- 定义清理已取消
使用场景:
- 在
cancel(SelectionKey)
方法中:cancelledKeys++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; }
- 防止频繁调用
selectAgain()
,优化性能。
- 在
设计理念:
- 硬编码 256 是一个经验值,平衡清理频率和性能开销。
- 避免每次取消
SelectionKey
都调用selectAgain()
,减少系统调用开销。
1.2 DISABLE_KEY_SET_OPTIMIZATION
定义:
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
类型:
boolean
,静态常量。修饰符:
private static final
,不可修改。初始化:通过
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false)
读取系统属性,默认为false
。作用:
- 控制是否禁用
SelectedSelectionKeySet
优化。 - 若为
true
,openSelector()
不替换Selector
的默认selectedKeys
集合,直接使用HashSet
。
- 控制是否禁用
使用场景:
- 在
openSelector()
方法中:if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); }
- 若优化启用(
false
),使用SelectedSelectionKeySet
(基于数组)替换默认HashSet
,提升SelectionKey
处理效率。
- 在
设计理念:
- 提供配置选项,允许用户在调试或兼容性问题时禁用优化。
- 默认启用优化,适用于高性能场景。
1.3 MIN_PREMATURE_SELECTOR_RETURNS
定义:
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
- 初始化:硬编码为 3。
- 作用:
- 定义
Selector.select()
连续过早返回的最小次数阈值,用于检测空轮询问题。 - 在
selectReturnPrematurely
方法中,若selectCnt > MIN_PREMATURE_SELECTOR_RETURNS
,记录调试日志。
- 定义
- 使用场景:
- 在
selectReturnPrematurely
方法中:if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); }
- 在
- 设计理念:
- 设置 3 次作为合理阈值,避免误判正常返回。
- 与
SELECTOR_AUTO_REBUILD_THRESHOLD
配合,检测严重空轮询问题。
1.4 SELECTOR_AUTO_REBUILD_THRESHOLD
定义:
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
static {
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
- 初始化:
- 通过
SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512)
读取系统属性,默认 512。 - 若值小于
MIN_PREMATURE_SELECTOR_RETURNS
(3),设为 0(禁用重建)。
- 通过
- 作用:
- 定义
Selector
连续过早返回的阈值,若selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD
,触发rebuildSelector()
重建Selector
。 - 解决 Java NIO 的空轮询 bug(如 https://github.com/netty/netty/issues/2426)。
- 定义
- 使用场景:
- 在
unexpectedSelectorWakeup
方法中:if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; }
- 在
- 设计理念:
- 默认 512 次是一个经验值,平衡性能和稳定性。
- 提供系统属性配置,允许用户调整或禁用(设为 0)。
1.5 selectNowSupplier
定义:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
- 作用:
- 提供
selector.selectNow()
的封装,供selectStrategy.calculateStrategy
使用。 - 返回就绪的
SelectionKey
数量,决定是否执行非阻塞select
。
- 提供
- 使用场景:
- 在
run()
方法中:strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
- 若
hasTasks()
为true
,selectNowSupplier.get()
调用selectNow()
检查是否有立即就绪的 I/O 事件。
- 在
- 设计理念:
- 抽象
selectNow()
调用,便于SelectStrategy
扩展。 - 异常由
run()
方法捕获,增强鲁棒性。
- 抽象
1.6 selector
定义:
private Selector selector;
- 初始化:
- 在构造函数中通过
openSelector()
初始化:final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector;
- 若启用
SelectedSelectionKeySet
,selector
是SelectedSelectionKeySetSelector
(包装类);否则是原生Selector
。
- 在构造函数中通过
- 作用:
- Java NIO 的核心组件,管理
Channel
的 I/O 事件(如OP_READ
、OP_WRITE
)。 - 用于
select()
、selectNow()
等操作,监控就绪事件。
- Java NIO 的核心组件,管理
- 使用场景:
run()
方法调用select(curDeadlineNanos)
和processSelectedKeys()
。wakeup()
方法调用selector.wakeup()
。rebuildSelector0()
迁移Channel
到新Selector
。
- 设计理念:
- 封装 NIO 的
Selector
,支持优化(如SelectedSelectionKeySet
)。
- 封装 NIO 的
1.7 unwrappedSelector
定义:
private Selector unwrappedSelector;
- 初始化:
- 在
openSelector()
中通过provider.openSelector()
创建:unwrappedSelector = provider.openSelector(); this.unwrappedSelector = selectorTuple.unwrappedSelector;
- 在
- 作用:
- 存储原生 NIO
Selector
,不受SelectedSelectionKeySet
包装。 - 用于直接注册
Channel
或在优化失败时回退。
- 存储原生 NIO
- 使用场景:
register0()
方法注册Channel
:ch.register(unwrappedSelector, interestOps, task);
rebuildSelector0()
迁移Channel
到新Selector
。
- 设计理念:
- 分离原生和包装
Selector
,提高兼容性和灵活性。
- 分离原生和包装
1.8 selectedKeys
定义:
private SelectedSelectionKeySet selectedKeys;
- 初始化:
- 在
openSelector()
中:final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); selectedKeys = selectedKeySet;
- 若优化失败或
DISABLE_KEY_SET_OPTIMIZATION
为true
,设为null
。
- 在
- 作用:
- 存储就绪的
SelectionKey
,使用数组结构替换默认HashSet
。 - 提高处理性能,减少内存分配和迭代开销。
- 存储就绪的
- 使用场景:
- 在
processSelectedKeys()
方法中:if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); }
processSelectedKeysOptimized()
迭代selectedKeys.keys
数组。
- 在
- 设计理念:
- 优化
SelectionKey
处理,适合高吞吐场景。 - 可通过
DISABLE_KEY_SET_OPTIMIZATION
禁用,增强兼容性。
- 优化
1.9 provider
定义:
private final SelectorProvider provider;
- 初始化:
- 通过构造函数传入并检查非空:
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
- 通常为
SelectorProvider.provider()
(默认 NIO 提供者)。
- 通过构造函数传入并检查非空:
- 作用:
- 用于创建
Selector
和Channel
实例,支持 NIO 操作。
- 用于创建
- 使用场景:
- 在
openSelector()
中:unwrappedSelector = provider.openSelector();
- 在
- 设计理念:
- 抽象 NIO 提供者,允许自定义实现(实际很少使用)。
1.10 AWAKE
定义:
private static final long AWAKE = -1L;
初始化:硬编码为
-1L
。作用:
- 表示
nextWakeupNanos
的状态,事件循环已唤醒(不阻塞在select
中)。
- 表示
使用场景:
- 在
nextWakeupNanos
更新中:nextWakeupNanos.lazySet(AWAKE);
- 在
wakeup()
方法中:nextWakeupNanos.getAndSet(AWAKE) != AWAKE
- 在
设计理念:
- 简单常量,表示非阻塞状态。
1.11 NONE
定义:
private static final long NONE = Long.MAX_VALUE;
- 初始化:硬编码为
Long.MAX_VALUE
。 - 作用:
- 表示
nextWakeupNanos
的状态,无定时任务,Selector
可无限期阻塞。
- 表示
- 使用场景:
- 在
run()
方法中:if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; }
- 在
select()
方法中:if (deadlineNanos == NONE) { return selector.select(); }
- 在
- 设计理念:
- 确保无任务时
Selector
无限期阻塞,节省 CPU。
- 确保无任务时
1.12 nextWakeupNanos
定义:
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
- 初始化:设为
AWAKE
(-1L
)。 - 作用:
- 存储
Selector
的下一次唤醒时间(纳秒):AWAKE (-1L)
:事件循环已唤醒。NONE (Long.MAX_VALUE)
:无定时任务。- 其他值:最早定时任务的截止时间。
- 通过
AtomicLong
确保线程安全,支持外部线程唤醒。
- 存储
- 使用场景:
- 在
run()
方法中:nextWakeupNanos.set(curDeadlineNanos); nextWakeupNanos.lazySet(AWAKE);
- 在
wakeup()
方法中:if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) { selector.wakeup(); }
- 在
beforeScheduledTaskSubmitted
和afterScheduledTaskSubmitted
中:return deadlineNanos < nextWakeupNanos.get();
- 在
- 设计理念:
- 替代
wakenUp
(此版本缺失)控制唤醒。 - 使用
lazySet
优化性能,减少内存屏障开销。
- 替代
1.13 selectStrategy
定义:
private final SelectStrategy selectStrategy;
- 初始化:
- 通过构造函数传入:
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
- 通常为
DefaultSelectStrategy
。
- 通过构造函数传入:
- 作用:
- 决定
Selector
的操作策略(select
、selectNow
等)。
- 决定
- 使用场景:
- 在
run()
方法中:strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
- 在
- 设计理念:
- 抽象选择逻辑,支持扩展性。
1.14 ioRatio
定义:
private volatile int ioRatio = 50;
- 类型:
int
,使用volatile
确保线程可见性。 - 修饰符:
private volatile
,实例级别,可修改。 - 初始化:默认 50(50% I/O,50% 任务)。
- 作用:
- 控制 I/O 处理和任务执行的时间比例(1-100)。
- 使用场景:
- 在
run()
方法中:final int ioRatio = this.ioRatio; if (ioRatio == 100) { // ... } else { runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
- 通过
setIoRatio(int)
修改:public void setIoRatio(int ioRatio) { if (ioRatio <= 0 || ioRatio > 100) { throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } this.ioRatio = ioRatio; }
- 在
- 设计理念:
- 平衡 I/O 和任务处理,适配不同负载。
volatile
确保跨线程可见性。
1.15 cancelledKeys
定义:
private int cancelledKeys;
- 初始化:隐式为 0。
- 作用:
- 跟踪已取消的
SelectionKey
数量。 - 当达到
CLEANUP_INTERVAL
(256)时,触发selectAgain()
清理。
- 跟踪已取消的
- 使用场景:
- 在
cancel(SelectionKey)
方法中:cancelledKeys++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; }
- 在
run()
方法中重置:cancelledKeys = 0;
- 在
- 设计理念:
- 防止频繁
selectAgain()
,优化性能。
- 防止频繁
1.16 needsToSelectAgain
定义:
private boolean needsToSelectAgain;
- 初始化:隐式为
false
。 - 作用:
- 指示是否需要调用
selectAgain()
清理已取消的SelectionKey
。
- 指示是否需要调用
- 使用场景:
- 在
cancel(SelectionKey)
中设置:needsToSelectAgain = true;
- 在
processSelectedKeysPlain
和processSelectedKeysOptimized
中检查:if (needsToSelectAgain) { selectAgain(); }
- 在
run()
中重置:needsToSelectAgain = false;
- 在
- 设计理念:
- 确保及时清理已取消的键,避免冗余
select
调用。
- 确保及时清理已取消的键,避免冗余
2. 方法解析
以下是对 NioEventLoop
类中关键方法的详细解析,涵盖逻辑、输入输出、异常处理及与字段的交互。
2.1 构造函数
定义:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory)
- 作用:初始化
NioEventLoop
实例。 - 输入:
parent
:父级NioEventLoopGroup
。executor
:线程执行器(如ThreadPerTaskExecutor
)。selectorProvider
:NIOSelectorProvider
。strategy
:选择策略。rejectedExecutionHandler
:任务队列溢出处理器。taskQueueFactory
,tailTaskQueueFactory
:任务队列工厂。
- 逻辑:
- 调用
SingleThreadEventLoop
构造函数,初始化任务队列。 - 初始化
provider
和selectStrategy
。 - 调用
openSelector()
设置selector
和unwrappedSelector
。
- 调用
- 字段交互:
- 设置
provider
、selectStrategy
、selector
、unwrappedSelector
、selectedKeys
。
- 设置
- 设计理念:确保线程安全初始化,支持自定义配置。
2.2 openSelector
定义:
private SelectorTuple openSelector()
- 作用:创建并配置 NIO
Selector
。 - 输出:
SelectorTuple
,包含unwrappedSelector
和selector
。 - 逻辑:
- 使用
provider.openSelector()
创建unwrappedSelector
。 - 若
DISABLE_KEY_SET_OPTIMIZATION
为false
,通过反射或Unsafe
替换Selector
的selectedKeys
为SelectedSelectionKeySet
。 - 返回优化或默认的
SelectorTuple
。
- 使用
- 字段交互:
- 设置
unwrappedSelector
、selector
、selectedKeys
。
- 设置
- 设计理念:优化
SelectionKey
处理,处理兼容性问题。
2.3 run
定义:
@Override
protected void run()
- 作用:驱动事件循环,处理 I/O 和任务。
- 逻辑:
- 无限循环,使用
selectStrategy
选择select
或selectNow
。 - 使用
nextWakeupNanos
设置select
超时。 - 通过
processSelectedKeys
处理SelectionKey
。 - 根据
ioRatio
调用runAllTasks
执行任务。 - 处理空轮询和关闭逻辑。
- 无限循环,使用
- 字段交互:
- 使用
selectStrategy
、selectNowSupplier
、nextWakeupNanos
、ioRatio
、cancelledKeys
、needsToSelectAgain
、selector
、selectedKeys
。
- 使用
- 设计理念:平衡 I/O 和任务处理,确保鲁棒性。
2.4 select
定义:
private int select(long deadlineNanos) throws IOException
- 作用:执行
Selector
选择操作,支持超时。 - 输入:
deadlineNanos
(下一次唤醒时间)。 - 输出:就绪的
SelectionKey
数量。 - 逻辑:
- 若
deadlineNanos == NONE
,调用selector.select()
(无限阻塞)。 - 否则,计算
timeoutMillis
,调用selector.select(timeoutMillis)
或selectNow()
。
- 若
- 字段交互:使用
selector
。 - 设计理念:根据定时任务优化超时。
2.5 wakeup
定义:
@Override
protected void wakeup(boolean inEventLoop)
- 作用:从其他线程唤醒
Selector
。 - 输入:
inEventLoop
(是否在事件循环线程中)。 - 逻辑:
- 若非事件循环线程且
nextWakeupNanos
非AWAKE
,设置AWAKE
并调用selector.wakeup()
。
- 若非事件循环线程且
- 字段交互:使用
nextWakeupNanos
、selector
。 - 设计理念:线程安全唤醒,避免重复调用。
2.6 processSelectedKeys
定义:
private void processSelectedKeys()
- 作用:处理就绪的
SelectionKey
。 - 逻辑:
- 若
selectedKeys
非空,调用processSelectedKeysOptimized()
。 - 否则,调用
processSelectedKeysPlain(selector.selectedKeys())
。
- 若
- 字段交互:使用
selectedKeys
、selector
。 - 设计理念:优化键处理,支持高吞吐。
2.7 rebuildSelector
定义:
public void rebuildSelector()
- 作用:重建
Selector
,修复空轮询问题。 - 逻辑:
- 通过
openSelector()
创建新Selector
。 - 将所有
Channel
注册到新Selector
。 - 关闭旧
Selector
。
- 通过
- 字段交互:更新
selector
、unwrappedSelector
、selectedKeys
。 - 设计理念:解决 Java NIO bug,确保稳定性。
2.8 其他方法
register
:将SelectableChannel
注册到Selector
。setIoRatio
,getIoRatio
:管理ioRatio
。cancel
:取消SelectionKey
,更新cancelledKeys
。closeAll
:关闭所有注册的Channel
。selectAgain
:立即调用selectNow()
清理键。
3. 源码
以下是 NioEventLoop.java
源码,添加了详细的中文注释,保留原始结构,确保清晰易读。
/**
* {@link SingleThreadEventLoop} 的实现,负责将 {@link Channel} 注册到 Java NIO 的 {@link Selector},
* 在单线程事件循环中进行 I/O 事件多路复用、任务调度和事件分发。每个 NioEventLoop 实例运行在一个独立线程,
* 处理注册在其上的 Channel 的 I/O 事件(如连接、读写)以及提交的任务(普通任务和定时任务)。
*/
public final class NioEventLoop extends SingleThreadEventLoop {
/**
* 日志记录器,供所有 NioEventLoop 实例共享,用于记录调试信息、警告和错误。
* - 初始化:通过 InternalLoggerFactory.getInstance(NioEventLoop.class) 创建,绑定到 NioEventLoop 类。
* - 使用场景:记录 Selector 操作(如重建、异常)、空轮询问题、Channel 注册失败等。
* - 示例:在 rebuildSelector0() 中记录 Selector 重建失败的警告;run() 中记录 CancelledKeyException。
* - 设计理念:统一日志接口,支持用户配置日志输出(如文件、控制台),静态常量减少内存占用。
*/
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
/**
* 定义清理已取消 SelectionKey 的间隔,单位为取消次数。
* - 初始化:硬编码为 256,一个经验值,平衡性能和清理频率。
* - 作用:当 cancelledKeys 达到 256 时,设置 needsToSelectAgain = true,触发 selectAgain() 清理 Selector 中的无效键。
* - 使用场景:在 cancel(SelectionKey) 方法中累加 cancelledKeys 并检查是否达到阈值。
* - 设计理念:避免每次取消键都调用 selectAgain(),减少系统调用开销,提高性能。
* - 注意:硬编码值通常无需调整,适合大多数场景。
*/
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* 控制是否禁用 SelectedSelectionKeySet 优化的标志,通过系统属性配置。
* - 初始化:通过 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false) 读取系统属性,默认 false(启用优化)。
* - 作用:决定 openSelector() 是否使用 SelectedSelectionKeySet 替换 Selector 的默认 selectedKeys(HashSet)。
* - 使用场景:在 openSelector() 中,若为 true,返回原生 Selector;否则尝试优化。
* - 设计理念:提供配置灵活性,允许用户在调试或兼容性问题时禁用优化,默认启用以提升性能。
* - 性能影响:优化后的 SelectedSelectionKeySet 使用数组实现,减少内存分配和迭代开销。
*/
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
/**
* Selector.select() 连续过早返回的最小次数阈值,用于触发调试日志。
* - 初始化:硬编码为 3,一个经验值,避免误判正常返回。
* - 作用:在 selectReturnPrematurely() 中,若 selectCnt > 3,记录调试日志,提示可能的空轮询问题。
* - 使用场景:在 run() 方法中检测 Selector 行为异常。
* - 设计理念:与 SELECTOR_AUTO_REBUILD_THRESHOLD 配合,区分轻微和严重空轮询问题。
*/
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
/**
* Selector 连续过早返回的重建阈值,用于触发 Selector 重建以解决空轮询 bug。
* - 初始化:通过 SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512) 读取系统属性,默认 512。
* 若值小于 MIN_PREMATURE_SELECTOR_RETURNS(3),设为 0(禁用重建)。
* - 作用:当 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 时,调用 rebuildSelector() 重建 Selector。
* - 使用场景:在 unexpectedSelectorWakeup() 中检查空轮询问题。
* - 设计理念:解决 Java NIO 的空轮询 bug(如 https://github.com/netty/netty/issues/2426),
* 默认 512 次是经验值,平衡性能和稳定性;支持用户通过系统属性调整或禁用。
* - 性能影响:重建 Selector 涉及 Channel 迁移,需谨慎设置阈值。
*/
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
/**
* 静态初始化块,处理 JDK NIO bug 和配置重建阈值。
* - 针对 Java 6 的 NIO bug(如 JDK-6427854),设置 sun.nio.ch.bugLevel 属性。
* - 初始化 SELECTOR_AUTO_REBUILD_THRESHOLD,确保不低于 MIN_PREMATURE_SELECTOR_RETURNS。
* - 记录调试日志,输出 DISABLE_KEY_SET_OPTIMIZATION 和 SELECTOR_AUTO_REBUILD_THRESHOLD 的值。
* - 设计理念:增强兼容性,确保 Netty 在不同 Java 版本下稳定运行。
*/
static {
if (PlatformDependent.javaVersion() < 7) {
final String key = "sun.nio.ch.bugLevel";
final String bugLevel = SystemPropertyUtil.get(key);
if (bugLevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("无法获取/设置系统属性: " + key, e);
}
}
}
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
/**
* 提供 selectNow() 的函数式接口封装,用于 selectStrategy 判断。
* - 初始化:匿名类实现,调用 selectNow() 方法返回就绪的 SelectionKey 数量。
* - 作用:供 selectStrategy.calculateStrategy() 使用,检查是否有立即就绪的 I/O 事件。
* - 使用场景:在 run() 方法中,若 hasTasks() 为 true,调用 selectNowSupplier.get() 执行非阻塞 select。
* - 设计理念:抽象 selectNow() 调用,方便扩展 selectStrategy;异常由 run() 捕获,增强鲁棒性。
* - 性能优化:避免直接调用 selectNow(),提高代码可维护性。
*/
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
/**
* NIO Selector,负责 I/O 事件多路复用。
* - 初始化:在构造函数中通过 openSelector() 设置,可能为:
* - SelectedSelectionKeySetSelector(优化后的包装类,若启用 SelectedSelectionKeySet)。
* - 原生 Selector(若禁用优化或优化失败)。
* - 作用:监控注册 Channel 的 I/O 事件(如 OP_READ、OP_WRITE、OP_ACCEPT)。
* - 使用场景:
* - run() 方法中调用 select() 和 selectNow()。
* - processSelectedKeys() 处理就绪的 SelectionKey。
* - wakeup() 调用 selector.wakeup() 唤醒阻塞。
* - rebuildSelector0() 迁移到新 Selector。
* - 设计理念:封装 NIO Selector,支持优化(如 SelectedSelectionKeySet)和重建,适应高并发场景。
* - 线程安全:仅由事件循环线程访问,无需同步。
*/
private Selector selector;
/**
* 原生 NIO Selector,未经 SelectedSelectionKeySet 优化。
* - 初始化:在 openSelector() 中通过 provider.openSelector() 创建。
* - 作用:
* - 用于直接注册 Channel(register0() 方法)。
* - 优化失败或禁用时作为 selector 的回退。
* - 在 Selector 重建时迁移 Channel。
* - 使用场景:
* - register0() 中注册 Channel。
* - rebuildSelector0() 迁移 Channel 注册。
* - 设计理念:分离原生和优化 Selector,增强兼容性和灵活性。
* - 线程安全:仅由事件循环线程访问。
*/
private Selector unwrappedSelector;
/**
* 基于数组的就绪 SelectionKey 集合,用于优化 Selector 性能。
* - 初始化:在 openSelector() 中创建,若优化失败或禁用则为 null。
* - 作用:替换 Selector 的默认 selectedKeys(HashSet),使用数组存储就绪 SelectionKey,减少内存分配和迭代开销。
* - 使用场景:
* - processSelectedKeys() 中若 selectedKeys 非空,调用 processSelectedKeysOptimized()。
* - processSelectedKeysOptimized() 迭代 selectedKeys.keys 数组。
* - 设计理念:优化高吞吐场景的 SelectionKey 处理,降低 CPU 和内存开销;可通过 DISABLE_KEY_SET_OPTIMIZATION 禁用。
* - 性能优化:数组比 HashSet 迭代更快,适合高并发 I/O 处理。
*/
private SelectedSelectionKeySet selectedKeys;
/**
* NIO SelectorProvider,用于创建 Selector 和 Channel。
* - 初始化:在构造函数中传入,确保非空(ObjectUtil.checkNotNull)。
* - 作用:提供 NIO 核心组件的创建接口,通常为 SelectorProvider.provider()(默认实现)。
* - 使用场景:在 openSelector() 中创建 unwrappedSelector。
* - 设计理念:抽象 NIO 提供者,支持自定义实现(实际很少使用),确保兼容性。
* - 线程安全:SelectorProvider 本身线程安全,provider 仅用于初始化。
*/
private final SelectorProvider provider;
/**
* 表示事件循环已唤醒(不阻塞在 select 中)的常量。
* - 初始化:硬编码为 -1L。
* - 作用:用于 nextWakeupNanos,表示 Selector 已唤醒,不需阻塞。
* - 使用场景:
* - run() 中通过 nextWakeupNanos.lazySet(AWAKE) 标记唤醒状态。
* - wakeup() 中通过 nextWakeupNanos.getAndSet(AWAKE) 检查状态。
* - 设计理念:简单常量,清晰表示非阻塞状态,优化唤醒逻辑。
*/
private static final long AWAKE = -1L;
/**
* 表示无定时任务,Selector 可无限期阻塞的常量。
* - 初始化:硬编码为 Long.MAX_VALUE。
* - 作用:用于 nextWakeupNanos,表示无定时任务,select() 可无限阻塞。
* - 使用场景:
* - run() 中若无定时任务,设置 curDeadlineNanos = NONE。
* - select() 中若 deadlineNanos == NONE,调用 selector.select()。
* - 设计理念:确保无任务时最小化 CPU 使用,优化阻塞行为。
*/
private static final long NONE = Long.MAX_VALUE;
/**
* 存储 Selector 下一次唤醒时间(纳秒),控制 select 超时和唤醒逻辑。
* - 初始化:设为 AWAKE(-1L),表示初始为唤醒状态。
* - 作用:
* - 表示三种状态:
* - AWAKE (-1L):事件循环已唤醒。
* - NONE (Long.MAX_VALUE):无定时任务,select 可无限阻塞。
* - 其他值:最早定时任务的截止时间(纳秒)。
* - 控制 select() 的超时时间,协调 I/O 和定时任务。
* - 支持外部线程通过 wakeup() 唤醒 Selector。
* - 使用场景:
* - run() 中设置 nextWakeupNanos.set(curDeadlineNanos) 和 lazySet(AWAKE)。
* - wakeup() 中通过 getAndSet(AWAKE) 判断是否需要唤醒。
* - beforeScheduledTaskSubmitted() 和 afterScheduledTaskSubmitted() 检查定时任务是否需要更早唤醒。
* - 设计理念:
* - 使用 AtomicLong 确保线程安全,允许外部线程安全修改。
* - lazySet(AWAKE) 优化性能,减少内存屏障开销。
* - 替代 Netty 早期版本的 wakenUp 字段,简化状态管理。
* - 性能优化:纳秒精度支持精确的任务调度,lazySet 减少同步开销。
*/
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
/**
* 选择策略,决定 Selector 的操作方式(如 select 或 selectNow)。
* - 初始化:在构造函数中传入,确保非空(ObjectUtil.checkNotNull),通常为 DefaultSelectStrategy。
* - 作用:根据任务队列状态和 selectNowSupplier 决定执行阻塞 select、非阻塞 selectNow 或其他操作。
* - 使用场景:在 run() 中调用 calculateStrategy() 确定策略。
* - 设计理念:抽象选择逻辑,支持扩展(如支持 epoll 或其他模型),提高灵活性。
*/
private final SelectStrategy selectStrategy;
/**
* I/O 处理与任务执行的时间比例,控制事件循环的资源分配。
* - 初始化:默认 50,表示 50% 时间用于 I/O,50% 用于任务。
* - 作用:范围 1-100,决定 processSelectedKeys() 和 runAllTasks() 的时间分配。
* - 使用场景:
* - run() 中根据 ioRatio 决定任务执行时间。
* - setIoRatio() 和 getIoRatio() 提供外部配置接口。
* - 设计理念:
* - 平衡 I/O 和任务处理,适配不同负载(如 I/O 密集或任务密集)。
* - volatile 确保多线程修改可见性。
* - 性能优化:动态调整比例,优化吞吐量和延迟。
*/
private volatile int ioRatio = 50;
/**
* 跟踪已取消的 SelectionKey 数量,用于触发清理。
* - 初始化:隐式为 0。
* - 作用:记录 cancel(SelectionKey) 调用次数,当达到 CLEANUP_INTERVAL(256)时,触发 selectAgain()。
* - 使用场景:
* - cancel() 中累加 cancelledKeys。
* - run() 中重置为 0。
* - 设计理念:延迟清理,减少 selectAgain() 调用频率,优化性能。
* - 线程安全:仅由事件循环线程修改,无需同步。
*/
private int cancelledKeys;
/**
* 指示是否需要调用 selectAgain() 清理已取消的 SelectionKey。
* - 初始化:隐式为 false。
* - 作用:当 cancelledKeys 达到 CLEANUP_INTERVAL 或其他条件时,设为 true,触发清理。
* - 使用场景:
* - cancel() 中设置 needsToSelectAgain = true。
* - processSelectedKeysPlain() 和 processSelectedKeysOptimized() 检查并调用 selectAgain()。
* - run() 中重置为 false。
* - 设计理念:确保及时清理无效键,避免 Selector 状态不一致,同时优化调用频率。
*/
private boolean needsToSelectAgain;
/**
* 构造函数,初始化 NioEventLoop 实例。
* - 参数:
* - parent:父级 NioEventLoopGroup,管理多个 EventLoop。
* - executor:线程执行器,通常为 ThreadPerTaskExecutor,运行事件循环。
* - selectorProvider:NIO SelectorProvider,创建 Selector 和 Channel。
* - strategy:选择策略,决定 select 操作。
* - rejectedExecutionHandler:任务队列溢出处理器。
* - taskQueueFactory:主任务队列工厂。
* - tailTaskQueueFactory:尾部任务队列工厂。
* - 逻辑:
* - 调用父类 SingleThreadEventLoop 构造函数,初始化任务队列。
* - 检查并设置 provider 和 selectStrategy。
* - 调用 openSelector() 初始化 selector 和 unwrappedSelector。
* - 字段交互:设置 provider、selectStrategy、selector、unwrappedSelector、selectedKeys。
* - 异常处理:通过 ObjectUtil.checkNotNull 确保参数非空。
* - 设计理念:确保线程安全初始化,支持自定义配置,适应不同场景。
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
/**
* 创建任务队列,若工厂为 null 则使用默认 MpscQueue。
* - 参数:queueFactory,任务队列工厂。
* - 返回:Queue<Runnable>,任务队列。
* - 逻辑:若 factory 为 null,调用 newTaskQueue0() 创建 MpscQueue;否则使用 factory 创建。
* - 设计理念:支持自定义队列实现,默认使用高性能 MpscQueue,适合单线程模型。
*/
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
/**
* 内部方法,创建指定容量的 MpscQueue。
* - 参数:maxPendingTasks,最大任务容量。
* - 返回:Queue<Runnable>,MpscQueue 实例。
* - 逻辑:根据 maxPendingTasks 使用 PlatformDependent 创建无界或有界队列。
* - 设计理念:MpscQueue 优化单生产者多消费者场景,适合事件循环。
*/
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
/**
* Selector 元组类,存储原生和优化后的 Selector。
* - 字段:
* - unwrappedSelector:原生 Selector。
* - selector:优化后的 Selector 或原生 Selector。
* - 构造函数:支持单一 Selector 或原生+优化 Selector。
* - 设计理念:封装 Selector 对,简化 openSelector() 的返回值。
*/
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
/**
* 创建并配置 Selector,支持 SelectedSelectionKeySet 优化。
* - 返回:SelectorTuple,包含 unwrappedSelector 和 selector。
* - 逻辑:
* - 使用 provider.openSelector() 创建 unwrappedSelector。
* - 若 DISABLE_KEY_SET_OPTIMIZATION 为 true,返回原生 Selector。
* - 否则,通过反射或 Unsafe 替换 Selector 的 selectedKeys 和 publicSelectedKeys 为 SelectedSelectionKeySet。
* - 若优化失败,selectedKeys 设为 null,回退到原生 Selector。
* - 字段交互:设置 unwrappedSelector、selector、selectedKeys。
* - 异常处理:捕获 IOException、NoSuchFieldException、IllegalAccessException,记录日志并回退。
* - 设计理念:优化 SelectionKey 处理,减少内存分配;支持兼容性回退。
* - 性能优化:SelectedSelectionKeySet 使用数组,显著提高高并发性能。
*/
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("无法打开新的 Selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("无法将特殊 java.util.Set 注入到: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("无法将特殊 java.util.Set 注入到: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("成功将特殊 java.util.Set 注入到: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
/**
* 返回此 NioEventLoop 使用的 SelectorProvider。
* - 逻辑:直接返回 provider 字段。
* - 设计理念:提供访问接口,便于外部获取 NIO 提供者。
*/
public SelectorProvider selectorProvider() {
return provider;
}
/**
* 创建新的任务队列,覆盖父类方法。
* - 逻辑:调用 newTaskQueue0() 创建 MpscQueue。
* - 设计理念:支持自定义容量,优化任务存储。
*/
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return newTaskQueue0(maxPendingTasks);
}
/**
* 将 SelectableChannel 注册到 Selector,指定兴趣操作和任务。
* - 参数:
* - ch:SelectableChannel,如 SocketChannel 或 ServerSocketChannel。
* - interestOps:兴趣操作(如 OP_READ、OP_WRITE)。
* - task:NioTask,处理 Channel 事件。
* - 逻辑:
* - 检查参数非空和有效性。
* - 若已关闭,抛出 IllegalStateException。
* - 若在事件循环线程,直接调用 register0();否则提交任务到事件循环。
* - 异常处理:捕获 InterruptedException,恢复线程中断状态。
* - 设计理念:确保注册操作线程安全,统一在事件循环线程执行。
*/
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
ObjectUtil.checkNotNull(ch, "ch");
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps 必须非零。");
}
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"无效的 interestOps: " + interestOps + "(有效值: " + ch.validOps() + ')');
}
ObjectUtil.checkNotNull(task, "task");
if (isShutdown()) {
throw new IllegalStateException("事件循环已关闭");
}
if (inEventLoop()) {
register0(ch, interestOps, task);
} else {
try {
submit(new Runnable() {
@Override
public void run() {
register0(ch, interestOps, task);
}
}).sync();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
/**
* 内部方法,执行 Channel 注册。
* - 参数:ch、interestOps、task,与 register() 相同。
* - 逻辑:调用 ch.register(unwrappedSelector, interestOps, task) 注册 Channel。
* - 异常处理:捕获 Exception,抛出 EventLoopException。
* - 设计理念:使用 unwrappedSelector 确保注册不受优化影响,保持兼容性。
*/
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("无法注册 Channel", e);
}
}
/**
* 获取当前的 I/O 比例。
* - 返回:ioRatio,1-100。
* - 逻辑:直接返回 ioRatio 字段。
* - 设计理念:提供外部访问接口,便于监控和调整。
*/
public int getIoRatio() {
return ioRatio;
}
/**
* 设置 I/O 比例,控制 I/O 和任务的时间分配。
* - 参数:ioRatio,1-100。
* - 逻辑:检查范围有效性,更新 ioRatio。
* - 异常处理:抛出 IllegalArgumentException 若范围无效。
* - 设计理念:支持动态调整,适配不同负载场景。
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (预期: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}
/**
* 重建 Selector,解决空轮询问题。
* - 逻辑:
* - 若不在事件循环线程,提交任务到事件循环。
* - 否则直接调用 rebuildSelector0()。
* - 设计理念:确保线程安全,统一在事件循环线程执行重建。
*/
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
/**
* 返回注册的 Channel 数量。
* - 返回:selector.keys().size() - cancelledKeys。
* - 逻辑:计算当前有效 Channel 数量,排除已取消的键。
* - 设计理念:提供监控接口,方便外部了解事件循环负载。
*/
@Override
public int registeredChannels() {
return selector.keys().size() - cancelledKeys;
}
/**
* 返回注册 Channel 的迭代器。
* - 返回:Iterator<Channel>,遍历有效 Channel。
* - 逻辑:
* - 若 selector.keys() 为空,返回空迭代器。
* - 否则创建迭代器,过滤有效 SelectionKey 并提取 AbstractNioChannel。
* - 设计理念:支持外部遍历 Channel,优化空集合处理。
*/
@Override
public Iterator<Channel> registeredChannelsIterator() {
assert inEventLoop();
final Set<SelectionKey> keys = selector.keys();
if (keys.isEmpty()) {
return ChannelsReadOnlyIterator.empty();
}
return new Iterator<Channel>() {
final Iterator<SelectionKey> selectionKeyIterator = ObjectUtil.checkNotNull(keys, "selectionKeys").iterator();
Channel next;
boolean isDone;
@Override
public boolean hasNext() {
if (isDone) {
return false;
}
Channel cur = next;
if (cur == null) {
cur = next = nextOrDone();
return cur != null;
}
return true;
}
@Override
public Channel next() {
if (isDone) {
throw new NoSuchElementException();
}
Channel cur = next;
if (cur == null) {
cur = nextOrDone();
if (cur == null) {
throw new NoSuchElementException();
}
}
next = nextOrDone();
return cur;
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove");
}
private Channel nextOrDone() {
Iterator<SelectionKey> it = selectionKeyIterator;
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isValid()) {
Object attachment = key.attachment();
if (attachment instanceof AbstractNioChannel) {
return (AbstractNioChannel) attachment;
}
}
}
isDone = true;
return null;
}
};
}
/**
* 内部方法,执行 Selector 重建。
* - 逻辑:
* - 创建新 Selector(newSelectorTuple)。
* - 遍历旧 Selector 的键,迁移有效 Channel 到新 Selector。
* - 更新 selector 和 unwrappedSelector。
* - 关闭旧 Selector。
* - 字段交互:更新 selector、unwrappedSelector、selectedKeys。
* - 异常处理:
* - 捕获创建 Selector 的异常,记录警告。
* - 捕获迁移 Channel 的异常,关闭失败的 Channel。
* - 捕获关闭旧 Selector 的异常,记录警告。
* - 设计理念:解决 Java NIO 空轮询 bug,确保事件循环稳定性。
*/
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("无法创建新的 Selector。", e);
return;
}
int nChannels = 0;
for (SelectionKey key : oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels++;
} catch (Exception e) {
logger.warn("无法将 Channel 重新注册到新 Selector。", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("无法关闭旧 Selector。", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("成功迁移 {} 个 Channel 到新 Selector。", nChannels);
}
}
/**
* 核心事件循环方法,驱动 I/O 事件和任务处理。
* - 逻辑:
* - 无限循环,调用 selectStrategy.calculateStrategy() 决定 select 操作。
* - 根据 nextWakeupNanos 设置 select 超时,处理 I/O 事件。
* - 根据 ioRatio 分配时间,调用 processSelectedKeys() 和 runAllTasks()。
* - 检测空轮询问题(selectReturnPrematurely 和 unexpectedSelectorWakeup)。
* - 处理关闭逻辑(isShuttingDown、closeAll、confirmShutdown)。
* - 字段交互:使用 selectStrategy、selectNowSupplier、nextWakeupNanos、ioRatio、cancelledKeys、needsToSelectAgain、selector、selectedKeys。
* - 异常处理:
* - 捕获 IOException,重建 Selector。
* - 捕获 CancelledKeyException,记录日志。
* - 捕获 Error,直接抛出。
* - 捕获其他 Throwable,调用 handleLoopException。
* - 设计理念:实现 Reactor 模型,平衡 I/O 和任务处理,优化性能和稳定性。
*/
@Override
protected void run() {
int selectCnt = 0; // 跟踪 select() 调用次数,检测空轮询。
for (;;) {
try {
int strategy;
try {
// 根据任务队列和 selectNowSupplier 决定选择策略。
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO 不支持忙等待,退回到 SELECT。
case SelectStrategy.SELECT:
// 获取最早定时任务的截止时间。
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // 无定时任务。
}
// 设置 Selector 下一次唤醒时间。
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 若无任务,执行带超时的 select。
strategy = select(curDeadlineNanos);
}
} finally {
// 标记事件循环为唤醒状态,防止不必要唤醒。
nextWakeupNanos.lazySet(AWAKE);
}
// 继续执行。
default:
}
} catch (IOException e) {
// Selector 异常时重建,重置 selectCnt。
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // 无 I/O 事件时运行最少任务。
}
if (selectReturnPrematurely(selectCnt, ranTasks, strategy)) {
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " 由 Selector 引发 {} - JDK bug?", selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
/**
* 检查 select 是否过早返回,若是则重置 selectCnt。
* - 参数:selectCnt(select 调用次数),ranTasks(是否执行了任务),strategy(选择策略)。
* - 返回:boolean,是否过早返回。
* - 逻辑:若 ranTasks 或 strategy > 0,记录调试日志(若 selectCnt > MIN_PREMATURE_SELECTOR_RETURNS),返回 true。
* - 设计理念:检测潜在空轮询问题,触发日志便于调试。
*/
private boolean selectReturnPrematurely(int selectCnt, boolean ranTasks, int strategy) {
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() 连续 {} 次过早返回,Selector: {}.", selectCnt - 1, selector);
}
return true;
}
return false;
}
/**
* 检测意外的 Selector 唤醒,必要时重建。
* - 参数:selectCnt,select 调用次数。
* - 返回:boolean,是否触发重建。
* - 逻辑:
* - 若线程被中断,返回 true。
* - 若 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD,触发 rebuildSelector()。
* - 设计理念:解决 Java NIO 空轮询 bug,确保稳定性。
*/
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() 因线程中断过早返回...");
}
return true;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn("Selector.select() 连续 {} 次过早返回,正在重建 Selector {}.", selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}
/**
* 处理事件循环中的意外异常,休眠 1 秒防止 CPU 过载。
* - 参数:t,异常。
* - 逻辑:记录警告日志,休眠 1 秒,忽略 InterruptedException。
* - 设计理念:防止异常导致事件循环高 CPU 占用,确保稳定性。
*/
private static void handleLoopException(Throwable t) {
logger.warn("选择器循环中发生意外异常。", t);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// 忽略。
}
}
/**
* 处理就绪的 SelectionKey,使用优化或默认实现。
* - 逻辑:
* - 若 selectedKeys 非空,调用 processSelectedKeysOptimized()。
* - 否则调用 processSelectedKeysPlain(selector.selectedKeys())。
* - 字段交互:使用 selectedKeys、selector。
* - 设计理念:根据优化状态选择高效处理方式,支持高吞吐。
*/
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
/**
* 清理资源,关闭 Selector。
* - 逻辑:调用 selector.close(),捕获 IOException 并记录警告。
* - 设计理念:确保事件循环关闭时释放资源。
*/
@Override
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn("无法关闭 Selector。", e);
}
}
/**
* 取消 SelectionKey,跟踪取消数量以触发清理。
* - 参数:key,SelectionKey。
* - 逻辑:
* - 调用 key.cancel()。
* - 累加 cancelledKeys,若达到 CLEANUP_INTERVAL,设置 needsToSelectAgain。
* - 设计理念:延迟清理,优化性能。
*/
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
/**
* 使用默认 HashSet 处理 SelectionKey。
* - 参数:selectedKeys,Selector 的就绪键集合。
* - 逻辑:
* - 遍历 selectedKeys,处理每个键(AbstractNioChannel 或 NioTask)。
* - 若 needsToSelectAgain 为 true,调用 selectAgain() 清理。
* - 设计理念:兼容默认 Selector 实现,处理非优化场景。
*/
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
/**
* 使用优化数组实现处理 SelectionKey。
* - 逻辑:
* - 遍历 selectedKeys.keys 数组,处理每个键。
* - 清空已处理键,检查 needsToSelectAgain。
* - 设计理念:利用数组高效迭代,优化高并发性能。
*/
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
/**
* 处理单个 SelectionKey 的 Channel 事件。
* - 参数:k(SelectionKey),ch(AbstractNioChannel)。
* - 逻辑:
* - 检查键有效性,无效则关闭 Channel。
* - 处理 OP_CONNECT、OP_WRITE、OP_READ/OP_ACCEPT 事件。
* - 异常处理:捕获 CancelledKeyException,关闭 Channel。
* - 设计理念:高效分发 I/O 事件,确保 Channel 状态一致。
*/
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
/**
* 处理 NioTask 的 SelectionKey 事件。
* - 参数:k(SelectionKey),task(NioTask)。
* - 逻辑:
* - 调用 task.channelReady() 处理事件。
* - 若异常,取消键并调用 channelUnregistered。
* - 确保无效键触发 channelUnregistered。
* - 设计理念:支持自定义 NioTask,扩展事件处理。
*/
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) {
invokeChannelUnregistered(task, k, null);
}
break;
default:
break;
}
}
}
/**
* 关闭所有注册的 Channel。
* - 逻辑:
* - 调用 selectAgain() 清理键。
* - 遍历 selector.keys(),关闭 AbstractNioChannel 或调用 NioTask 的 channelUnregistered。
* - 设计理念:确保事件循环关闭时清理所有资源。
*/
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k : keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch : channels) {
ch.unsafe().close(ch.unsafe().void