Java 中断(Interrupt)机制详解
Java 的中断机制是一种协作式的线程间通信机制,用于请求另一个线程停止当前正在执行的操作。
Thread thread = Thread.currentThread();
thread.interrupt(); // 设置当前线程的中断状态
检查中断状态
// 检查中断状态
boolean isInterrupted = Thread.currentThread().isInterrupted();
// 检查并清除中断状态
boolean wasInterrupted = Thread.interrupted();
中断产生的后果
对阻塞方法的影响
当线程在以下阻塞方法中被中断时,会抛出 InterruptedException
:
try {
Thread.sleep(1000); // sleep
object.wait(); // wait
thread.join(); // join
LockSupport.park(); // park
// 以及各种 I/O 操作和同步器
} catch (InterruptedException e) {
// 中断异常处理
Thread.currentThread().interrupt(); // 恢复中断状态
}
对非阻塞代码的影响
对于非阻塞代码,中断不会自动产生异常,需要手动检查:
while (!Thread.currentThread().isInterrupted()) {
// 执行任务
System.out.println("Working...");
// 模拟工作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// 睡眠时被中断
Thread.currentThread().interrupt(); // 重新设置中断状态
break;
}
}
System.out.println("线程被中断,优雅退出");
正确的中断处理模式
1. 传播中断状态
public void task() {
try {
while (true) {
// 执行工作
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
// 或者检查中断的阻塞操作
Thread.sleep(100);
}
} catch (InterruptedException e) {
// 恢复中断状态并退出
Thread.currentThread().interrupt();
}
}
2. 不可中断任务的处理
public void nonInterruptibleTask() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
// 执行清理操作
System.out.println("收到中断请求,执行清理后退出");
break;
}
// 执行不可中断的工作
}
}
正确使用中断机制可以实现线程的安全、协作式停止,避免使用已废弃的 stop()
方法。
sleep为什么抛出异常
调用本地方法:
private static native void sleepNanos0(long nanos) throws InterruptedException;
jvm.cpp中注册
JVM_ENTRY(void, JVM_SleepNanos(JNIEnv* env, jclass threadClass, jlong nanos))
if (nanos < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");
}
if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);
HOTSPOT_THREAD_SLEEP_BEGIN(nanos / NANOSECS_PER_MILLISEC);
if (nanos == 0) {
os::naked_yield();
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (!thread->sleep_nanos(nanos)) { // interrupted
// An asynchronous exception could have been thrown on
// us while we were sleeping. We do not overwrite those.
if (!HAS_PENDING_EXCEPTION) {
HOTSPOT_THREAD_SLEEP_END(1);
// TODO-FIXME: THROW_MSG returns which means we will not call set_state()
// to properly restore the thread state. That's likely wrong.
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
HOTSPOT_THREAD_SLEEP_END(0);
JVM_END
JVM_SleepNanos
这个函数是sleep抛出异常的地方。以下是该函数的核心逻辑:
首先检查
nanos
参数是否小于0,如果是则抛出IllegalArgumentException
异常(实际上Java层面已经检查了,所以c++层面不会报异常):if (nanos < 0) { THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range"); }
检查线程是否已被中断,如果是则抛出
InterruptedException
异常:if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) { THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); }
在实际睡眠过程中,如果线程被中断,也会抛出
InterruptedException
异常:if (!thread->sleep_nanos(nanos)) { // interrupted // An asynchronous exception could have been thrown on // us while we were sleeping. We do not overwrite those. if (!HAS_PENDING_EXCEPTION) { // TODO-FIXME: THROW_MSG returns which means we will not call set_state() // to properly restore the thread state. That's likely wrong. THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); } }
Object.wait异常
同样调用本地方法
private final native void wait0(long timeoutMillis) throws InterruptedException;
cpp实现在ObjectMonitor.cpp,这个函数很长,简化如下:
// ObjectMonitor的等待方法:处理线程等待、中断和超时逻辑
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
JavaThread* current = THREAD;
assert(InitDone, "未初始化");
CHECK_OWNER(); // 检查所有者,非所有者抛出异常
EventJavaMonitorWait wait_event;
EventVirtualThreadPinned vthread_pinned_event;
// 检查中断状态:若可中断且已中断,直接抛出异常
if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);
// 发送JVMTI监控等待事件
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait(current, object(), millis);
}
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(current, this, false);
}
if (wait_event.should_commit()) {
post_monitor_wait_event(&wait_event, this, 0, millis, false);
}
THROW(vmSymbols::java_lang_InterruptedException());
return;
}
// 虚拟线程处理:尝试挂起虚拟线程
freeze_result result;
ContinuationEntry* ce = current->last_continuation();
bool is_virtual = ce != nullptr && ce->is_virtual_thread();
if (is_virtual) {
if (interruptible && JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait(current, object(), millis);
}
current->set_current_waiting_monitor(this);
result = Continuation::try_preempt(current, ce->cont_oop(current));
if (result == freeze_ok) {
vthread_wait(current, millis);
current->set_current_waiting_monitor(nullptr);
return;
}
}
// 进入等待状态
JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);
if (!is_virtual) {
if (interruptible && JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait(current, object(), millis);
}
current->set_current_waiting_monitor(this);
}
// 创建等待节点并加入等待队列
ObjectWaiter node(current);
node.TState = ObjectWaiter::TS_WAIT;
current->_ParkEvent->reset();
OrderAccess::fence();
Thread::SpinAcquire(&_wait_set_lock);
add_waiter(&node);
Thread::SpinRelease(&_wait_set_lock);
// 退出监控器并准备挂起
intx save = _recursions;
_waiters++;
_recursions = 0;
exit(current);
guarantee(!has_owner(current), "invariant");
// 挂起线程:检查中断或超时
int ret = OS_OK;
int WasNotified = 0;
bool interrupted = interruptible && current->is_interrupted(false);
{
OSThread* osthread = current->osthread();
OSThreadWaitState osts(osthread, true);
assert(current->thread_state() == _thread_in_vm, "invariant");
{
ClearSuccOnSuspend csos(this);
ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true);
if (interrupted || HAS_PENDING_EXCEPTION) {
// 空处理:已有中断或异常
} else if (!node._notified) {
if (millis <= 0) {
current->_ParkEvent->park();
} else {
ret = current->_ParkEvent->park(millis);
}
}
}
// 从等待队列移除节点(如果需要)
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire(&_wait_set_lock);
if (node.TState == ObjectWaiter::TS_WAIT) {
dequeue_specific_waiter(&node);
assert(!node._notified, "invariant");
node.TState = ObjectWaiter::TS_RUN;
}
Thread::SpinRelease(&_wait_set_lock);
}
guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");
OrderAccess::loadload();
if (has_successor(current)) clear_successor();
WasNotified = node._notified;
// 发送JVMTI监控等待完成事件
if (JvmtiExport::should_post_monitor_waited()) {
JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);
if (node._notified && has_successor(current)) {
current->_ParkEvent->unpark();
}
}
if (wait_event.should_commit()) {
post_monitor_wait_event(&wait_event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);
}
OrderAccess::fence();
// 重新获取监控器锁
assert(!has_owner(current), "invariant");
ObjectWaiter::TStates v = node.TState;
if (v == ObjectWaiter::TS_RUN) {
NoPreemptMark npm(current);
enter(current);
} else {
guarantee(v == ObjectWaiter::TS_ENTER, "invariant");
reenter_internal(current, &node);
node.wait_reenter_end(this);
}
guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");
assert(has_owner(current), "invariant");
assert(!has_successor(current), "invariant");
}
// 清理状态:重置等待监控器引用
current->set_current_waiting_monitor(nullptr);
// 恢复递归计数
guarantee(_recursions == 0, "invariant");
int relock_count = JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current);
_recursions = save + relock_count;
current->inc_held_monitor_count(relock_count);
_waiters--;
// 验证后置条件
assert(has_owner(current), "invariant");
assert(!has_successor(current), "invariant");
assert_mark_word_consistency();
// 虚拟线程事件记录
if (ce != nullptr && ce->is_virtual_thread()) {
current->post_vthread_pinned_event(&vthread_pinned_event, "Object.wait", result);
}
// 检查通知结果:未通知可能是超时或中断
if (!WasNotified) {
if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
THROW(vmSymbols::java_lang_InterruptedException());
}
}
}
ObjectMonitor::wait
函数是 Java 对象监视器(Object Monitor)中实现 Object.wait()
方法的核心函数。它允许线程在对象上等待,直到其他线程调用该对象的 notify() 或 notifyAll() 方法。
1. 初始化和检查
获取当前线程信息并验证初始化状态
使用
CHECK_OWNER()
检查当前线程是否为监视器的所有者,如果不是则抛出IllegalMonitorStateException
2. 事件初始化
初始化
EventJavaMonitorWait
和EventVirtualThreadPinned
事件,用于监控和追踪目的
3. 中断检查
如果线程可中断(
interruptible
为 true)且已被中断,则在进入等待前抛出InterruptedException
4. 虚拟线程处理
如果是虚拟线程(virtual thread),则调用 vthread_wait 方法进行特殊处理,包括:
创建 ObjectWaiter 节点
将节点添加到等待队列
处理虚拟线程的挂起和状态转换
5. 常规等待逻辑
对于非虚拟线程或虚拟线程的后续步骤:
创建 JavaThreadInObjectWaitState 对象来管理线程等待状态
如果启用了 JVMTI,则发布 monitor wait 事件
保存当前的递归计数并重置为 0
退出监视器(调用 exit 方法)
增加等待者计数
6. 等待阶段
调用操作系统相关的 park 函数使线程进入等待状态
等待可能因超时、中断或 notify 调用而结束
7. 唤醒后处理
检查唤醒原因(通知、超时或中断)
如果是中断且线程可中断,则抛出
InterruptedException
重新获取监视器锁
恢复递归计数
发布 monitor waited 事件
8. 清理工作
- 减少等待者计数
- 清理 successor(如果存在)
- 删除 ObjectWaiter 节点
这个函数实现了 Java 中对象等待机制的核心逻辑,处理了线程状态管理、同步控制、事件追踪和异常处理等多个方面,确保线程安全地等待和被唤醒。
Thread.join
A线程调用ThreadB.join,实际上就只是调用对方的wait。当线程B结束会唤醒所有的等待者。
/**
等待该线程终止,最多等待 {@code millis} 毫秒。
如果超时时间设为 {@code 0} 则表示无限期等待(直到线程终止)。
•
如果该线程尚未被 {@link #start() 启动},则此方法会立即返回,无需等待。
•
•
@implNote 实现说明:
•
对于平台线程,该实现采用循环调用 {@code this.wait} 方法,并在 {@code this.isAlive} 条件满足时持续等待。
•
当线程终止时,会调用 {@code this.notifyAll} 方法唤醒等待的线程。
•
建议应用程序不要在 {@code Thread} 实例上使用 {@code wait}、{@code notify} 或 {@code notifyAll} 方法,
•
以避免与线程同步机制发生冲突或造成意外行为。
•
@throws InterruptedException
•
如果任何线程中断了当前线程。当抛出此异常时,当前线程的<i>中断状态</i>将被清除。
*/
public final void join(long millis) throws InterruptedException {
if (millis < 0)
throw new IllegalArgumentException("timeout value is negative");
if (this instanceof VirtualThread vthread) {
if (isAlive()) {
long nanos = MILLISECONDS.toNanos(millis);
vthread.joinNanos(nanos);
}
return;
}
synchronized (this) {
if (millis > 0) {
if (isAlive()) {
final long startTime = System.nanoTime();
long delay = millis;
do {
wait(delay);
} while (isAlive() && (delay = millis -
NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
}
} else {
while (isAlive()) {
wait(0);
}
}
}
}