关于阻塞状态线程在安全点同步中无需调用block
函数的详细流程解析:
1. 安全点同步入口:SafepointSynchronize::begin()
VM线程调用此函数启动安全点,核心步骤如下:
获取线程锁(
Threads_lock
):防止新线程启动或旧线程退出。初始化计数器:
_waiting_to_block
设为当前线程总数,表示需等待阻塞的线程数。激活安全点(
arm_safepoint()
):设置全局安全点标志,触发线程轮询。调用
synchronize_threads()
:进入主循环,等待所有线程进入安全点。
2. 同步线程主循环:synchronize_threads()
此函数通过多次迭代检查所有线程状态,直到所有线程进入安全点:
首次遍历构建链表:将所有未进入安全点的线程(
thread_not_running(cur_tss) == false
)加入链表tss_head
。主循环处理未阻塞线程:
cpp
复制
下载
do { // 遍历链表,检查每个线程状态 while (cur_tss != NULL) { if (thread_not_running(cur_tss)) { // 从链表中移除该线程,减少still_running计数 --still_running; ... } else { // 保留在链表中,继续处理 ... } } } while (still_running > 0);
关键点:只有状态仍为运行的线程会保留在链表中,后续可能触发阻塞操作。
3. 状态检查逻辑:thread_not_running()
该函数判断线程是否已处于安全点或无需阻塞:
cpp
复制
下载
bool SafepointSynchronize::thread_not_running(ThreadSafepointState *cur_state) { if (!cur_state->is_running()) return true; // 已非运行 cur_state->examine_state_of_thread(...); // 检查线程状态 return !cur_state->is_running(); // 检查后再次确认 }
examine_state_of_thread()
:深入分析线程状态,决定是否需要阻塞。
4. 线程状态分析:examine_state_of_thread()
此函数通过safepoint_safe_with()
判断线程是否已安全:
cpp
复制
下载
void ThreadSafepointState::examine_state_of_thread(...) { JavaThreadState stable_state; if (!try_stable_load_state(&stable_state, ...)) return; // 无法获取稳定状态,视为运行中 if (safepoint_safe_with(_thread, stable_state)) { account_safe_thread(); // 标记线程为安全 return; } // 其他状态需继续运行,直到主动挂起 }
safepoint_safe_with()
:核心判断逻辑,针对阻塞状态线程。
5. 阻塞状态判定:safepoint_safe_with()
此函数明确处理阻塞状态:
cpp
复制
下载
static bool safepoint_safe_with(JavaThread *thread, JavaThreadState state) { switch(state) { case _thread_blocked: // 阻塞状态的线程直接视为安全 return true; case _thread_in_native: // 本地方法线程需检查栈可遍历性 return ...; default: return false; // 其他状态需进一步处理 } }
阻塞状态(
_thread_blocked
):直接返回true
,无需额外操作。
6. 标记安全线程:account_safe_thread()
当线程被判定为安全时,此函数更新全局状态:
cpp
复制
下载
void ThreadSafepointState::account_safe_thread() { SafepointSynchronize::decrement_waiting_to_block(); // 减少等待计数器 if (_thread->in_critical()) { SafepointSynchronize::increment_jni_active_count(); // 处理JNI临界区 } _safepoint_safe = true; // 标记线程为安全点安全 }
减少
_waiting_to_block
:表示该线程已处理完毕。标记
_safepoint_safe
:后续检查中is_running()
将返回false
。
7. 阻塞函数的调用条件
在提供的代码中,未直接调用block()
函数,但隐含逻辑如下:
需主动阻塞的线程:状态为运行中(如执行Java代码、JNI非临界区),会保留在
tss_head
链表中,等待后续处理(如轮询触发挂起)。阻塞状态的线程:因
safepoint_safe_with()
返回true
,被移出链表,不会进入需阻塞的分支。
总结:为何阻塞状态线程无需调用block
状态判定:通过
safepoint_safe_with()
识别阻塞状态,直接标记为安全。链表移除:在
synchronize_threads()
循环中,安全线程被移出待处理链表。计数器更新:
_waiting_to_block
递减,最终为0时同步完成。避免冗余操作:已阻塞线程无需二次挂起,提升效率。
此机制确保安全点同步仅处理必要线程,避免对已阻塞线程的不必要操作。
##源码
// Roll all threads forward to a safepoint and suspend them all
void SafepointSynchronize::begin() {
assert(Thread::current()->is_VM_thread(), "Only VM thread may execute a safepoint");
EventSafepointBegin begin_event;
SafepointTracing::begin(VMThread::vm_op_type());
Universe::heap()->safepoint_synchronize_begin();
// By getting the Threads_lock, we assure that no threads are about to start or
// exit. It is released again in SafepointSynchronize::end().
Threads_lock->lock();
assert( _state == _not_synchronized, "trying to safepoint synchronize with wrong state");
int nof_threads = Threads::number_of_threads();
_nof_threads_hit_polling_page = 0;
log_debug(safepoint)("Safepoint synchronization initiated using %s wait barrier. (%d threads)", _wait_barrier->description(), nof_threads);
// Reset the count of active JNI critical threads
_current_jni_active_count = 0;
// Set number of threads to wait for
_waiting_to_block = nof_threads;
jlong safepoint_limit_time = 0;
if (SafepointTimeout) {
// Set the limit time, so that it can be compared to see if this has taken
// too long to complete.
safepoint_limit_time = SafepointTracing::start_of_safepoint() + (jlong)SafepointTimeoutDelay * (NANOUNITS / MILLIUNITS);
timeout_error_printed = false;
}
EventSafepointStateSynchronization sync_event;
int initial_running = 0;
// Arms the safepoint, _current_jni_active_count and _waiting_to_block must be set before.
arm_safepoint();
// Will spin until all threads are safe.
int iterations = synchronize_threads(safepoint_limit_time, nof_threads, &initial_running);
assert(_waiting_to_block == 0, "No thread should be running");
#ifndef PRODUCT
// Mark all threads
if (VerifyCrossModifyFence) {
JavaThreadIteratorWithHandle jtiwh;
for (; JavaThread *cur = jtiwh.next(); ) {
cur->set_requires_cross_modify_fence(true);
}
}
if (safepoint_limit_time != 0) {
jlong current_time = os::javaTimeNanos();
if (safepoint_limit_time < current_time) {
log_warning(safepoint)("# SafepointSynchronize: Finished after "
INT64_FORMAT_W(6) " ms",
(int64_t)(current_time - SafepointTracing::start_of_safepoint()) / (NANOUNITS / MILLIUNITS));
}
}
#endif
assert(Threads_lock->owned_by_self(), "must hold Threads_lock");
// Record state
_state = _synchronized;
OrderAccess::fence();
// Set the new id
++_safepoint_id;
#ifdef ASSERT
// Make sure all the threads were visited.
for (JavaThreadIteratorWithHandle jtiwh; JavaThread *cur = jtiwh.next(); ) {
assert(cur->was_visited_for_critical_count(_safepoint_counter), "missed a thread");
}
#endif // ASSERT
// Update the count of active JNI critical regions
GCLocker::set_jni_lock_count(_current_jni_active_count);
post_safepoint_synchronize_event(sync_event,
_safepoint_id,
initial_running,
_waiting_to_block, iterations);
SafepointTracing::synchronized(nof_threads, initial_running, _nof_threads_hit_polling_page);
// We do the safepoint cleanup first since a GC related safepoint
// needs cleanup to be completed before running the GC op.
EventSafepointCleanup cleanup_event;
do_cleanup_tasks();
post_safepoint_cleanup_event(cleanup_event, _safepoint_id);
post_safepoint_begin_event(begin_event, _safepoint_id, nof_threads, _current_jni_active_count);
SafepointTracing::cleanup();
}
int SafepointSynchronize::synchronize_threads(jlong safepoint_limit_time, int nof_threads, int* initial_running)
{
JavaThreadIteratorWithHandle jtiwh;
#ifdef ASSERT
for (; JavaThread *cur = jtiwh.next(); ) {
assert(cur->safepoint_state()->is_running(), "Illegal initial state");
}
jtiwh.rewind();
#endif // ASSERT
// Iterate through all threads until it has been determined how to stop them all at a safepoint.
int still_running = nof_threads;
ThreadSafepointState *tss_head = NULL;
ThreadSafepointState **p_prev = &tss_head;
for (; JavaThread *cur = jtiwh.next(); ) {
ThreadSafepointState *cur_tss = cur->safepoint_state();
assert(cur_tss->get_next() == NULL, "Must be NULL");
if (thread_not_running(cur_tss)) {
--still_running;
} else {
*p_prev = cur_tss;
p_prev = cur_tss->next_ptr();
}
}
*p_prev = NULL;
DEBUG_ONLY(assert_list_is_valid(tss_head, still_running);)
*initial_running = still_running;
// If there is no thread still running, we are already done.
if (still_running <= 0) {
assert(tss_head == NULL, "Must be empty");
return 1;
}
int iterations = 1; // The first iteration is above.
int64_t start_time = os::javaTimeNanos();
do {
// Check if this has taken too long:
if (SafepointTimeout && safepoint_limit_time < os::javaTimeNanos()) {
print_safepoint_timeout();
}
p_prev = &tss_head;
ThreadSafepointState *cur_tss = tss_head;
while (cur_tss != NULL) {
assert(cur_tss->is_running(), "Illegal initial state");
if (thread_not_running(cur_tss)) {
--still_running;
*p_prev = NULL;
ThreadSafepointState *tmp = cur_tss;
cur_tss = cur_tss->get_next();
tmp->set_next(NULL);
} else {
*p_prev = cur_tss;
p_prev = cur_tss->next_ptr();
cur_tss = cur_tss->get_next();
}
}
DEBUG_ONLY(assert_list_is_valid(tss_head, still_running);)
if (still_running > 0) {
back_off(start_time);
}
iterations++;
} while (still_running > 0);
assert(tss_head == NULL, "Must be empty");
return iterations;
}
bool SafepointSynchronize::thread_not_running(ThreadSafepointState *cur_state) {
if (!cur_state->is_running()) {
return true;
}
cur_state->examine_state_of_thread(SafepointSynchronize::safepoint_counter());
if (!cur_state->is_running()) {
return true;
}
LogTarget(Trace, safepoint) lt;
if (lt.is_enabled()) {
ResourceMark rm;
LogStream ls(lt);
cur_state->print_on(&ls);
}
return false;
}
void ThreadSafepointState::examine_state_of_thread(uint64_t safepoint_count) {
assert(is_running(), "better be running or just have hit safepoint poll");
JavaThreadState stable_state;
if (!SafepointSynchronize::try_stable_load_state(&stable_state, _thread, safepoint_count)) {
// We could not get stable state of the JavaThread.
// Consider it running and just return.
return;
}
if (safepoint_safe_with(_thread, stable_state)) {
std::cout << "@@@@yym%%%%----myThreadName----safepoint_safe_with----" << _thread->name() << std::endl;
account_safe_thread();
return;
}
// All other thread states will continue to run until they
// transition and self-block in state _blocked
// Safepoint polling in compiled code causes the Java threads to do the same.
// Note: new threads may require a malloc so they must be allowed to finish
assert(is_running(), "examine_state_of_thread on non-running thread");
return;
}
static bool safepoint_safe_with(JavaThread *thread, JavaThreadState state) {
switch(state) {
case _thread_in_native:
// native threads are safe if they have no java stack or have walkable stack
return !thread->has_last_Java_frame() || thread->frame_anchor()->walkable();
case _thread_blocked:
// On wait_barrier or blocked.
// Blocked threads should already have walkable stack.
assert(!thread->has_last_Java_frame() || thread->frame_anchor()->walkable(), "blocked and not walkable");
return true;
default:
return false;
}
}
void ThreadSafepointState::account_safe_thread() {
SafepointSynchronize::decrement_waiting_to_block();
if (_thread->in_critical()) {
// Notice that this thread is in a critical section
SafepointSynchronize::increment_jni_active_count();
}
DEBUG_ONLY(_thread->set_visited_for_critical_count(SafepointSynchronize::safepoint_counter());)
assert(!_safepoint_safe, "Must be unsafe before safe");
_safepoint_safe = true;
}