深入解析 Java interrupt

发布于:2025-08-30 ⋅ 阅读:(20) ⋅ 点赞:(0)

Java 中断(Interrupt)机制详解

Java 的中断机制是一种协作式的线程间通信机制,用于请求另一个线程停止当前正在执行的操作。

Thread thread = Thread.currentThread();
thread.interrupt(); // 设置当前线程的中断状态

检查中断状态

// 检查中断状态
boolean isInterrupted = Thread.currentThread().isInterrupted();

// 检查并清除中断状态
boolean wasInterrupted = Thread.interrupted();

中断产生的后果

对阻塞方法的影响

当线程在以下阻塞方法中被中断时,会抛出 InterruptedException

try {
    Thread.sleep(1000);        // sleep
    object.wait();             // wait
    thread.join();             // join
    LockSupport.park();        // park
    // 以及各种 I/O 操作和同步器
} catch (InterruptedException e) {
    // 中断异常处理
    Thread.currentThread().interrupt(); // 恢复中断状态
}

对非阻塞代码的影响

对于非阻塞代码,中断不会自动产生异常,需要手动检查:

while (!Thread.currentThread().isInterrupted()) {
    // 执行任务
    System.out.println("Working...");
    // 模拟工作
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        // 睡眠时被中断
        Thread.currentThread().interrupt(); // 重新设置中断状态
        break;
    }
}
System.out.println("线程被中断,优雅退出");

正确的中断处理模式

1. 传播中断状态

public void task() {
    try {
        while (true) {
            // 执行工作
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException();
            }
            // 或者检查中断的阻塞操作
            Thread.sleep(100);
        }
    } catch (InterruptedException e) {
        // 恢复中断状态并退出
        Thread.currentThread().interrupt();
    }
}

2. 不可中断任务的处理

public void nonInterruptibleTask() {
    while (true) {
        if (Thread.currentThread().isInterrupted()) {
            // 执行清理操作
            System.out.println("收到中断请求,执行清理后退出");
            break;
        }
        // 执行不可中断的工作
    }
}

正确使用中断机制可以实现线程的安全、协作式停止,避免使用已废弃的 stop()方法。

sleep为什么抛出异常

调用本地方法:

    private static native void sleepNanos0(long nanos) throws InterruptedException;

jvm.cpp中注册

JVM_ENTRY(void, JVM_SleepNanos(JNIEnv* env, jclass threadClass, jlong nanos))
  if (nanos < 0) {
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");
  }

  if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
    THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
  }

  // Save current thread state and restore it at the end of this block.
  // And set new thread state to SLEEPING.
  JavaThreadSleepState jtss(thread);

  HOTSPOT_THREAD_SLEEP_BEGIN(nanos / NANOSECS_PER_MILLISEC);

  if (nanos == 0) {
    os::naked_yield();
  } else {
    ThreadState old_state = thread->osthread()->get_state();
    thread->osthread()->set_state(SLEEPING);
    if (!thread->sleep_nanos(nanos)) { // interrupted
      // An asynchronous exception could have been thrown on
      // us while we were sleeping. We do not overwrite those.
      if (!HAS_PENDING_EXCEPTION) {
        HOTSPOT_THREAD_SLEEP_END(1);

        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
        // to properly restore the thread state.  That's likely wrong.
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
    }
    thread->osthread()->set_state(old_state);
  }
  HOTSPOT_THREAD_SLEEP_END(0);
JVM_END

JVM_SleepNanos这个函数是sleep抛出异常的地方。以下是该函数的核心逻辑:

  1. 首先检查nanos参数是否小于0,如果是则抛出IllegalArgumentException异常(实际上Java层面已经检查了,所以c++层面不会报异常):

  2. if (nanos < 0) {
      THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");
    }
    
  3. 检查线程是否已被中断,如果是则抛出InterruptedException异常:

    if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
      THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
    }
    
  4. 在实际睡眠过程中,如果线程被中断,也会抛出InterruptedException异常:

    if (!thread->sleep_nanos(nanos)) { // interrupted
      // An asynchronous exception could have been thrown on
      // us while we were sleeping. We do not overwrite those.
      if (!HAS_PENDING_EXCEPTION) {
        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
        // to properly restore the thread state.  That's likely wrong.
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
    }
    

Object.wait异常

同样调用本地方法

    private final native void wait0(long timeoutMillis) throws InterruptedException;

cpp实现在ObjectMonitor.cpp,这个函数很长,简化如下:

// ObjectMonitor的等待方法:处理线程等待、中断和超时逻辑
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
  JavaThread* current = THREAD;
  assert(InitDone, "未初始化");
  CHECK_OWNER();  // 检查所有者,非所有者抛出异常

  EventJavaMonitorWait wait_event;
  EventVirtualThreadPinned vthread_pinned_event;

  // 检查中断状态:若可中断且已中断,直接抛出异常
  if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
    JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);
    
    // 发送JVMTI监控等待事件
    if (JvmtiExport::should_post_monitor_wait()) {
      JvmtiExport::post_monitor_wait(current, object(), millis);
    }
    if (JvmtiExport::should_post_monitor_waited()) {
      JvmtiExport::post_monitor_waited(current, this, false);
    }
    if (wait_event.should_commit()) {
      post_monitor_wait_event(&wait_event, this, 0, millis, false);
    }
    THROW(vmSymbols::java_lang_InterruptedException());
    return;
  }

  // 虚拟线程处理:尝试挂起虚拟线程
  freeze_result result;
  ContinuationEntry* ce = current->last_continuation();
  bool is_virtual = ce != nullptr && ce->is_virtual_thread();
  if (is_virtual) {
    if (interruptible && JvmtiExport::should_post_monitor_wait()) {
      JvmtiExport::post_monitor_wait(current, object(), millis);
    }
    current->set_current_waiting_monitor(this);
    result = Continuation::try_preempt(current, ce->cont_oop(current));
    if (result == freeze_ok) {
      vthread_wait(current, millis);
      current->set_current_waiting_monitor(nullptr);
      return;
    }
  }

  // 进入等待状态
  JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);
  if (!is_virtual) {
    if (interruptible && JvmtiExport::should_post_monitor_wait()) {
      JvmtiExport::post_monitor_wait(current, object(), millis);
    }
    current->set_current_waiting_monitor(this);
  }

  // 创建等待节点并加入等待队列
  ObjectWaiter node(current);
  node.TState = ObjectWaiter::TS_WAIT;
  current->_ParkEvent->reset();
  OrderAccess::fence();

  Thread::SpinAcquire(&_wait_set_lock);
  add_waiter(&node);
  Thread::SpinRelease(&_wait_set_lock);

  // 退出监控器并准备挂起
  intx save = _recursions;
  _waiters++;
  _recursions = 0;
  exit(current);
  guarantee(!has_owner(current), "invariant");

  // 挂起线程:检查中断或超时
  int ret = OS_OK;
  int WasNotified = 0;
  bool interrupted = interruptible && current->is_interrupted(false);

  {
    OSThread* osthread = current->osthread();
    OSThreadWaitState osts(osthread, true);
    assert(current->thread_state() == _thread_in_vm, "invariant");

    {
      ClearSuccOnSuspend csos(this);
      ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true);
      if (interrupted || HAS_PENDING_EXCEPTION) {
        // 空处理:已有中断或异常
      } else if (!node._notified) {
        if (millis <= 0) {
          current->_ParkEvent->park();
        } else {
          ret = current->_ParkEvent->park(millis);
        }
      }
    }

    // 从等待队列移除节点(如果需要)
    if (node.TState == ObjectWaiter::TS_WAIT) {
      Thread::SpinAcquire(&_wait_set_lock);
      if (node.TState == ObjectWaiter::TS_WAIT) {
        dequeue_specific_waiter(&node);
        assert(!node._notified, "invariant");
        node.TState = ObjectWaiter::TS_RUN;
      }
      Thread::SpinRelease(&_wait_set_lock);
    }

    guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");
    OrderAccess::loadload();
    if (has_successor(current)) clear_successor();
    WasNotified = node._notified;

    // 发送JVMTI监控等待完成事件
    if (JvmtiExport::should_post_monitor_waited()) {
      JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);
      if (node._notified && has_successor(current)) {
        current->_ParkEvent->unpark();
      }
    }

    if (wait_event.should_commit()) {
      post_monitor_wait_event(&wait_event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);
    }

    OrderAccess::fence();

    // 重新获取监控器锁
    assert(!has_owner(current), "invariant");
    ObjectWaiter::TStates v = node.TState;
    if (v == ObjectWaiter::TS_RUN) {
      NoPreemptMark npm(current);
      enter(current);
    } else {
      guarantee(v == ObjectWaiter::TS_ENTER, "invariant");
      reenter_internal(current, &node);
      node.wait_reenter_end(this);
    }

    guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");
    assert(has_owner(current), "invariant");
    assert(!has_successor(current), "invariant");
  }

  // 清理状态:重置等待监控器引用
  current->set_current_waiting_monitor(nullptr);

  // 恢复递归计数
  guarantee(_recursions == 0, "invariant");
  int relock_count = JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current);
  _recursions = save + relock_count;
  current->inc_held_monitor_count(relock_count);
  _waiters--;

  // 验证后置条件
  assert(has_owner(current), "invariant");
  assert(!has_successor(current), "invariant");
  assert_mark_word_consistency();

  // 虚拟线程事件记录
  if (ce != nullptr && ce->is_virtual_thread()) {
    current->post_vthread_pinned_event(&vthread_pinned_event, "Object.wait", result);
  }

  // 检查通知结果:未通知可能是超时或中断
  if (!WasNotified) {
    if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {
      THROW(vmSymbols::java_lang_InterruptedException());
    }
  }
}

ObjectMonitor::wait 函数是 Java 对象监视器(Object Monitor)中实现 Object.wait() 方法的核心函数。它允许线程在对象上等待,直到其他线程调用该对象的 notify() 或 notifyAll() 方法。

1. 初始化和检查

  • 获取当前线程信息并验证初始化状态

  • 使用 CHECK_OWNER() 检查当前线程是否为监视器的所有者,如果不是则抛出 IllegalMonitorStateException

2. 事件初始化

  • 初始化 EventJavaMonitorWait 和 EventVirtualThreadPinned 事件,用于监控和追踪目的

3. 中断检查

  • 如果线程可中断(interruptible 为 true)且已被中断,则在进入等待前抛出 InterruptedException

4. 虚拟线程处理

  • 如果是虚拟线程(virtual thread),则调用 vthread_wait 方法进行特殊处理,包括:

    • 创建 ObjectWaiter 节点

    • 将节点添加到等待队列

    • 处理虚拟线程的挂起和状态转换

5. 常规等待逻辑

对于非虚拟线程或虚拟线程的后续步骤:

  • 创建 JavaThreadInObjectWaitState 对象来管理线程等待状态

  • 如果启用了 JVMTI,则发布 monitor wait 事件

  • 保存当前的递归计数并重置为 0

  • 退出监视器(调用 exit 方法)

  • 增加等待者计数

6. 等待阶段

  • 调用操作系统相关的 park 函数使线程进入等待状态

  • 等待可能因超时、中断或 notify 调用而结束

7. 唤醒后处理

  • 检查唤醒原因(通知、超时或中断)

  • 如果是中断且线程可中断,则抛出 InterruptedException

  • 重新获取监视器锁

  • 恢复递归计数

  • 发布 monitor waited 事件

8. 清理工作

  • 减少等待者计数
  • 清理 successor(如果存在)
  • 删除 ObjectWaiter 节点

这个函数实现了 Java 中对象等待机制的核心逻辑,处理了线程状态管理、同步控制、事件追踪和异常处理等多个方面,确保线程安全地等待和被唤醒。

Thread.join

A线程调用ThreadB.join,实际上就只是调用对方的wait。当线程B结束会唤醒所有的等待者。

/​**​

    等待该线程终止,最多等待 {@code millis} 毫秒。

    如果超时时间设为 {@code 0} 则表示无限期等待(直到线程终止)。
    •

    如果该线程尚未被 {@link #start() 启动},则此方法会立即返回,无需等待。
    •
    •

    @implNote 实现说明:
    •

    对于平台线程,该实现采用循环调用 {@code this.wait} 方法,并在 {@code this.isAlive} 条件满足时持续等待。
    •

    当线程终止时,会调用 {@code this.notifyAll} 方法唤醒等待的线程。
    •

    建议应用程序不要在 {@code Thread} 实例上使用 {@code wait}、{@code notify} 或 {@code notifyAll} 方法,
    •

    以避免与线程同步机制发生冲突或造成意外行为。

    •

    @throws InterruptedException
    •

    如果任何线程中断了当前线程。当抛出此异常时,当前线程的<i>中断状态</i>将被清除。

    */
    public final void join(long millis) throws InterruptedException {
        if (millis < 0)
            throw new IllegalArgumentException("timeout value is negative");

        if (this instanceof VirtualThread vthread) {
            if (isAlive()) {
                long nanos = MILLISECONDS.toNanos(millis);
                vthread.joinNanos(nanos);
            }
            return;
        }

        synchronized (this) {
            if (millis > 0) {
                if (isAlive()) {
                    final long startTime = System.nanoTime();
                    long delay = millis;
                    do {
                        wait(delay);
                    } while (isAlive() && (delay = millis -
                             NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
                }
            } else {
                while (isAlive()) {
                    wait(0);
                }
            }
        }
    }


网站公告

今日签到

点亮在社区的每一天
去签到