Java 中断(Interrupt)机制详解
Java 的中断机制是一种协作式的线程间通信机制,用于请求另一个线程停止当前正在执行的操作。
Thread thread = Thread.currentThread();
thread.interrupt(); // 设置当前线程的中断状态
检查中断状态
// 检查中断状态
boolean isInterrupted = Thread.currentThread().isInterrupted();// 检查并清除中断状态
boolean wasInterrupted = Thread.interrupted();
中断产生的后果
对阻塞方法的影响
当线程在以下阻塞方法中被中断时,会抛出 InterruptedException
:
try {Thread.sleep(1000); // sleepobject.wait(); // waitthread.join(); // joinLockSupport.park(); // park// 以及各种 I/O 操作和同步器
} catch (InterruptedException e) {// 中断异常处理Thread.currentThread().interrupt(); // 恢复中断状态
}
对非阻塞代码的影响
对于非阻塞代码,中断不会自动产生异常,需要手动检查:
while (!Thread.currentThread().isInterrupted()) {// 执行任务System.out.println("Working...");// 模拟工作try {Thread.sleep(100);} catch (InterruptedException e) {// 睡眠时被中断Thread.currentThread().interrupt(); // 重新设置中断状态break;}
}
System.out.println("线程被中断,优雅退出");
正确的中断处理模式
1. 传播中断状态
public void task() {try {while (true) {// 执行工作if (Thread.currentThread().isInterrupted()) {throw new InterruptedException();}// 或者检查中断的阻塞操作Thread.sleep(100);}} catch (InterruptedException e) {// 恢复中断状态并退出Thread.currentThread().interrupt();}
}
2. 不可中断任务的处理
public void nonInterruptibleTask() {while (true) {if (Thread.currentThread().isInterrupted()) {// 执行清理操作System.out.println("收到中断请求,执行清理后退出");break;}// 执行不可中断的工作}
}
正确使用中断机制可以实现线程的安全、协作式停止,避免使用已废弃的 stop()
方法。
sleep为什么抛出异常
调用本地方法:
private static native void sleepNanos0(long nanos) throws InterruptedException;
jvm.cpp中注册
JVM_ENTRY(void, JVM_SleepNanos(JNIEnv* env, jclass threadClass, jlong nanos))if (nanos < 0) {THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range");}if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}// Save current thread state and restore it at the end of this block.// And set new thread state to SLEEPING.JavaThreadSleepState jtss(thread);HOTSPOT_THREAD_SLEEP_BEGIN(nanos / NANOSECS_PER_MILLISEC);if (nanos == 0) {os::naked_yield();} else {ThreadState old_state = thread->osthread()->get_state();thread->osthread()->set_state(SLEEPING);if (!thread->sleep_nanos(nanos)) { // interrupted// An asynchronous exception could have been thrown on// us while we were sleeping. We do not overwrite those.if (!HAS_PENDING_EXCEPTION) {HOTSPOT_THREAD_SLEEP_END(1);// TODO-FIXME: THROW_MSG returns which means we will not call set_state()// to properly restore the thread state. That's likely wrong.THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");}}thread->osthread()->set_state(old_state);}HOTSPOT_THREAD_SLEEP_END(0);
JVM_END
JVM_SleepNanos
这个函数是sleep抛出异常的地方。以下是该函数的核心逻辑:
-
首先检查
nanos
参数是否小于0,如果是则抛出IllegalArgumentException
异常(实际上Java层面已经检查了,所以c++层面不会报异常): -
if (nanos < 0) {THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "nanosecond timeout value out of range"); }
-
检查线程是否已被中断,如果是则抛出
InterruptedException
异常:if (thread->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted"); }
-
在实际睡眠过程中,如果线程被中断,也会抛出
InterruptedException
异常:if (!thread->sleep_nanos(nanos)) { // interrupted// An asynchronous exception could have been thrown on// us while we were sleeping. We do not overwrite those.if (!HAS_PENDING_EXCEPTION) {// TODO-FIXME: THROW_MSG returns which means we will not call set_state()// to properly restore the thread state. That's likely wrong.THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");} }
Object.wait异常
同样调用本地方法
private final native void wait0(long timeoutMillis) throws InterruptedException;
cpp实现在ObjectMonitor.cpp,这个函数很长,简化如下:
// ObjectMonitor的等待方法:处理线程等待、中断和超时逻辑
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {JavaThread* current = THREAD;assert(InitDone, "未初始化");CHECK_OWNER(); // 检查所有者,非所有者抛出异常EventJavaMonitorWait wait_event;EventVirtualThreadPinned vthread_pinned_event;// 检查中断状态:若可中断且已中断,直接抛出异常if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);// 发送JVMTI监控等待事件if (JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}if (JvmtiExport::should_post_monitor_waited()) {JvmtiExport::post_monitor_waited(current, this, false);}if (wait_event.should_commit()) {post_monitor_wait_event(&wait_event, this, 0, millis, false);}THROW(vmSymbols::java_lang_InterruptedException());return;}// 虚拟线程处理:尝试挂起虚拟线程freeze_result result;ContinuationEntry* ce = current->last_continuation();bool is_virtual = ce != nullptr && ce->is_virtual_thread();if (is_virtual) {if (interruptible && JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}current->set_current_waiting_monitor(this);result = Continuation::try_preempt(current, ce->cont_oop(current));if (result == freeze_ok) {vthread_wait(current, millis);current->set_current_waiting_monitor(nullptr);return;}}// 进入等待状态JavaThreadInObjectWaitState jtiows(current, millis != 0, interruptible);if (!is_virtual) {if (interruptible && JvmtiExport::should_post_monitor_wait()) {JvmtiExport::post_monitor_wait(current, object(), millis);}current->set_current_waiting_monitor(this);}// 创建等待节点并加入等待队列ObjectWaiter node(current);node.TState = ObjectWaiter::TS_WAIT;current->_ParkEvent->reset();OrderAccess::fence();Thread::SpinAcquire(&_wait_set_lock);add_waiter(&node);Thread::SpinRelease(&_wait_set_lock);// 退出监控器并准备挂起intx save = _recursions;_waiters++;_recursions = 0;exit(current);guarantee(!has_owner(current), "invariant");// 挂起线程:检查中断或超时int ret = OS_OK;int WasNotified = 0;bool interrupted = interruptible && current->is_interrupted(false);{OSThread* osthread = current->osthread();OSThreadWaitState osts(osthread, true);assert(current->thread_state() == _thread_in_vm, "invariant");{ClearSuccOnSuspend csos(this);ThreadBlockInVMPreprocess<ClearSuccOnSuspend> tbivs(current, csos, true);if (interrupted || HAS_PENDING_EXCEPTION) {// 空处理:已有中断或异常} else if (!node._notified) {if (millis <= 0) {current->_ParkEvent->park();} else {ret = current->_ParkEvent->park(millis);}}}// 从等待队列移除节点(如果需要)if (node.TState == ObjectWaiter::TS_WAIT) {Thread::SpinAcquire(&_wait_set_lock);if (node.TState == ObjectWaiter::TS_WAIT) {dequeue_specific_waiter(&node);assert(!node._notified, "invariant");node.TState = ObjectWaiter::TS_RUN;}Thread::SpinRelease(&_wait_set_lock);}guarantee(node.TState != ObjectWaiter::TS_WAIT, "invariant");OrderAccess::loadload();if (has_successor(current)) clear_successor();WasNotified = node._notified;// 发送JVMTI监控等待完成事件if (JvmtiExport::should_post_monitor_waited()) {JvmtiExport::post_monitor_waited(current, this, ret == OS_TIMEOUT);if (node._notified && has_successor(current)) {current->_ParkEvent->unpark();}}if (wait_event.should_commit()) {post_monitor_wait_event(&wait_event, this, node._notifier_tid, millis, ret == OS_TIMEOUT);}OrderAccess::fence();// 重新获取监控器锁assert(!has_owner(current), "invariant");ObjectWaiter::TStates v = node.TState;if (v == ObjectWaiter::TS_RUN) {NoPreemptMark npm(current);enter(current);} else {guarantee(v == ObjectWaiter::TS_ENTER, "invariant");reenter_internal(current, &node);node.wait_reenter_end(this);}guarantee(node.TState == ObjectWaiter::TS_RUN, "invariant");assert(has_owner(current), "invariant");assert(!has_successor(current), "invariant");}// 清理状态:重置等待监控器引用current->set_current_waiting_monitor(nullptr);// 恢复递归计数guarantee(_recursions == 0, "invariant");int relock_count = JvmtiDeferredUpdates::get_and_reset_relock_count_after_wait(current);_recursions = save + relock_count;current->inc_held_monitor_count(relock_count);_waiters--;// 验证后置条件assert(has_owner(current), "invariant");assert(!has_successor(current), "invariant");assert_mark_word_consistency();// 虚拟线程事件记录if (ce != nullptr && ce->is_virtual_thread()) {current->post_vthread_pinned_event(&vthread_pinned_event, "Object.wait", result);}// 检查通知结果:未通知可能是超时或中断if (!WasNotified) {if (interruptible && current->is_interrupted(true) && !HAS_PENDING_EXCEPTION) {THROW(vmSymbols::java_lang_InterruptedException());}}
}
ObjectMonitor::wait
函数是 Java 对象监视器(Object Monitor)中实现 Object.wait()
方法的核心函数。它允许线程在对象上等待,直到其他线程调用该对象的 notify() 或 notifyAll() 方法。
1. 初始化和检查
-
获取当前线程信息并验证初始化状态
-
使用
CHECK_OWNER()
检查当前线程是否为监视器的所有者,如果不是则抛出IllegalMonitorStateException
2. 事件初始化
-
初始化
EventJavaMonitorWait
和EventVirtualThreadPinned
事件,用于监控和追踪目的
3. 中断检查
-
如果线程可中断(
interruptible
为 true)且已被中断,则在进入等待前抛出InterruptedException
4. 虚拟线程处理
-
如果是虚拟线程(virtual thread),则调用 vthread_wait 方法进行特殊处理,包括:
-
创建 ObjectWaiter 节点
-
将节点添加到等待队列
-
处理虚拟线程的挂起和状态转换
-
5. 常规等待逻辑
对于非虚拟线程或虚拟线程的后续步骤:
-
创建 JavaThreadInObjectWaitState 对象来管理线程等待状态
-
如果启用了 JVMTI,则发布 monitor wait 事件
-
保存当前的递归计数并重置为 0
-
退出监视器(调用 exit 方法)
-
增加等待者计数
6. 等待阶段
-
调用操作系统相关的 park 函数使线程进入等待状态
-
等待可能因超时、中断或 notify 调用而结束
7. 唤醒后处理
-
检查唤醒原因(通知、超时或中断)
-
如果是中断且线程可中断,则抛出
InterruptedException
-
重新获取监视器锁
-
恢复递归计数
-
发布 monitor waited 事件
8. 清理工作
- 减少等待者计数
- 清理 successor(如果存在)
- 删除 ObjectWaiter 节点
这个函数实现了 Java 中对象等待机制的核心逻辑,处理了线程状态管理、同步控制、事件追踪和异常处理等多个方面,确保线程安全地等待和被唤醒。
Thread.join
A线程调用ThreadB.join,实际上就只是调用对方的wait。当线程B结束会唤醒所有的等待者。
/**等待该线程终止,最多等待 {@code millis} 毫秒。如果超时时间设为 {@code 0} 则表示无限期等待(直到线程终止)。•如果该线程尚未被 {@link #start() 启动},则此方法会立即返回,无需等待。••@implNote 实现说明:•对于平台线程,该实现采用循环调用 {@code this.wait} 方法,并在 {@code this.isAlive} 条件满足时持续等待。•当线程终止时,会调用 {@code this.notifyAll} 方法唤醒等待的线程。•建议应用程序不要在 {@code Thread} 实例上使用 {@code wait}、{@code notify} 或 {@code notifyAll} 方法,•以避免与线程同步机制发生冲突或造成意外行为。•@throws InterruptedException•如果任何线程中断了当前线程。当抛出此异常时,当前线程的<i>中断状态</i>将被清除。*/public final void join(long millis) throws InterruptedException {if (millis < 0)throw new IllegalArgumentException("timeout value is negative");if (this instanceof VirtualThread vthread) {if (isAlive()) {long nanos = MILLISECONDS.toNanos(millis);vthread.joinNanos(nanos);}return;}synchronized (this) {if (millis > 0) {if (isAlive()) {final long startTime = System.nanoTime();long delay = millis;do {wait(delay);} while (isAlive() && (delay = millis -NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);}} else {while (isAlive()) {wait(0);}}}}