Apache Ignite 核心组件:GridClosureProcessor解析

发布于:2025-08-12 ⋅ 阅读:(21) ⋅ 点赞:(0)

这是一个 Apache Ignite 中非常核心的组件 —— GridClosureProcessor,它是 分布式闭包(Closure)执行的调度中枢,负责在集群节点上异步执行用户提交的任务(如 RunnableClosure)。

我们来逐层深入理解它的设计思想、关键机制和代码逻辑。


🧱 一、类概览:GridClosureProcessor

public class GridClosureProcessor extends GridProcessorAdapter
  • 职责:处理所有基于闭包(函数式)的远程执行请求
  • 常见用途:
    • compute().run(Runnable)
    • compute().call(Closure)
    • compute().broadcast(Closure)
    • cache().affinity().run(...)
  • 它是 ComputeTask 的底层支撑模块

🔩 二、关键字段解析

字段 类型 作用
pools PoolProcessor 线程池管理器,用于获取执行任务的线程池
busyLock GridSpinReadWriteLock 控制组件在 停止期间不接受新任务
stopping boolean 标记当前处理器是否正在停止

⚠️ 这三个字段共同实现了 “优雅关闭” 的核心逻辑。


🔒 三、busyLock:优雅关闭的关键机制

1. 什么是 GridSpinReadWriteLock

  • Ignite 自定义的 自旋读写锁
  • 特点:
    • 读锁可重入、允许多个线程同时持有
    • 写锁独占,用于“停止”阶段
    • 使用 自旋 + sleep 避免线程频繁阻塞唤醒

2. 读锁(readLock()):

  • 所有任务提交方法(runAsync, callAsync, broadcast)都先获取读锁
  • 表示:“我正在使用这个处理器”
  • 允许多个线程并发提交任务

3. 写锁(tryWriteLock(...)):

  • onKernalStop(...) 中使用
  • 目的:阻止任何新任务提交,并标记为“停止中”

🛑 四、onKernalStop(...):优雅关闭流程

@Override
public void onKernalStop(boolean cancel) {
    boolean interrupted = false;

    while (true) {
        try {
            if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
                break;
            else
                Thread.sleep(200);
        }
        catch (InterruptedException ignore) {
            interrupted = true;
        }
    }

    try {
        if (interrupted)
            Thread.currentThread().interrupt();

        stopping = true; // 标记为停止状态
    }
    finally {
        busyLock.writeUnlock();
    }
}

🔍 流程详解:

  1. 尝试获取写锁

    • tryWriteLock(200ms):尝试在 200ms 内获取写锁
    • 如果有线程持有读锁(即正在提交任务),则失败
    • 失败后 Thread.sleep(200),然后重试
  2. 为什么是“Busy Wait”?

    • 注解 @SuppressWarnings("BusyWait") 表示这是有意为之的忙等待
    • 目的:尽快完成关闭,避免长时间阻塞
    • 每 200ms 尝试一次,不会过度消耗 CPU
  3. 处理中断

    • 如果等待期间被中断,记录 interrupted = true
    • 最后恢复中断状态(线程安全最佳实践)
  4. 设置 stopping = true

    • 获取写锁后,设置标志位
    • 之后所有 runAsync 等调用都会被拒绝
  5. 释放写锁

    • 即使发生异常,也确保释放锁

✅ 这是一个典型的 “关闭守卫”模式:先阻止新请求,再清理资源。


🚀 五、任务提交方法分析

所有任务提交方法都遵循统一模式:

busyLock.readLock();
try {
    if (stopping) reject();
    // 提交任务
} finally {
    busyLock.readUnlock();
}

我们以 runAsync(...) 为例:

runAsync(...):运行一批 Runnable

public ComputeTaskInternalFuture<?> runAsync(...) {
    assert mode != null;
    assert !F.isEmpty(jobs);

    busyLock.readLock(); // 获取读锁

    try {
        if (stopping) {
            return finishedFuture(new IgniteCheckedException("Closure processor cannot be used on stopped grid"));
        }

        if (F.isEmpty(nodes))
            return finishedFuture(U.emptyTopologyException());

        ctx.task().setThreadContext(TC_SUBGRID, nodes);

        return ctx.task().execute(new T1(mode, jobs), null, sys, execName);
    }
    finally {
        busyLock.readUnlock(); // 释放读锁
    }
}
关键点:
  • stopping 检查:如果正在停止,直接返回失败 future
  • nodes 检查:拓扑为空则返回空拓扑异常
  • ctx.task().execute(...):交给 TaskProcessor 执行(T1 是一个内部任务类型)
  • 使用 sys 参数决定使用 系统线程池 还是 公共线程池

callAsync(...):远程调用 Closure

public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, T arg, ...)
  • 执行一个带返回值的函数(Closure<T,R>
  • 返回 ComputeTaskInternalFuture<R>,可获取结果

broadcast(...):广播到所有节点

public <T, R> IgniteInternalFuture<Collection<R>> broadcast(...)
  • nodes 列表中的每个节点上执行 job
  • 返回一个 Future<Collection<R>>,包含所有节点的返回值

affinityRun(...):基于数据亲和性执行

public ComputeTaskInternalFuture<?> affinityRun(...)
  • 关键用途:将任务发送到 特定缓存分区(partition)的主节点
  • 流程:
    1. 获取当前拓扑版本 readyAffinityVersion()
    2. 使用 ctx.affinity().mapPartitionToNode(...) 找到负责该分区的节点
    3. 只在那个节点上执行任务
  • 优势:本地化执行,避免数据移动,性能极高

💡 这是 Ignite 实现“移动计算而非数据”的核心机制之一。


🧩 六、T1, T8, T11, T4 是什么?

这些是 内部任务类(定义在 GridTaskInternalFuture 或内部类中),用于包装用户任务:

任务类 包装的任务类型
T1 GridClosureCallMode + Collection<Runnable>
T8 IgniteClosure<T,R>
T11 Broadcast 任务
T4 Affinity 任务

它们都继承自 ComputeTaskAdapter,由 TaskProcessor 调度执行。


🎯 七、整体架构图(简化)

+---------------------+
|  User Code          |
|  compute().run(...) |
+----------+----------+
           |
           v
+---------------------+
| GridClosureProcessor|
| - busyLock          |
| - stopping          |
+----------+----------+
           |
           v
+---------------------+
|  TaskProcessor      |
|  execute(Task)      |
+----------+----------+
           |
           v
+---------------------+
|  PoolProcessor      |
|  系统/公共线程池     |
+---------------------+

✅ 八、设计亮点总结

特性 说明
读写锁控制关闭 读锁允许多任务并发提交,写锁确保关闭时原子性
优雅拒绝新任务 stopping 标志 + finishedFuture 快速失败
支持多种执行模式 单节点、广播、亲和性执行
与 Task 子系统集成 复用 TaskProcessor 的调度能力
线程安全 所有提交路径都受锁保护
可观测性 调试日志、异常信息清晰

📌 九、一句话总结

GridClosureProcessor 是 Ignite 的 分布式任务调度入口,它通过 读写锁机制 实现了 高并发提交 + 优雅关闭,并支持 普通执行、广播、数据亲和性执行 等多种模式,是 Compute 子系统的核心引擎。


💡 十、你可以借鉴的设计模式

1. 关闭守卫模式(Shutdown Guard)

private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean shuttingDown = false;

public void submit(Runnable task) {
    shutdownLock.readLock().lock();
    try {
        if (shuttingDown) throw new RejectedExecutionException();
        // 执行任务
    } finally {
        shutdownLock.readLock().unlock();
    }
}

public void shutdown() {
    shutdownLock.writeLock().lock();
    try {
        shuttingDown = true;
    } finally {
        shutdownLock.writeLock().unlock();
    }
}

2. 快速失败(Fail-Fast)

  • 不让任务进入队列,而是在入口就拒绝
  • 返回一个“已完成的失败 Future”,避免资源浪费

🏁 结语

GridClosureProcessor 虽然代码量不大,但它体现了分布式系统中 资源管理、并发控制、生命周期管理 的最佳实践。理解它,有助于你设计自己的 高可用、可扩展的任务调度系统


网站公告

今日签到

点亮在社区的每一天
去签到