Apache Ignite分片线程池:高并发保序新方案

发布于:2025-08-12 ⋅ 阅读:(16) ⋅ 点赞:(0)

这是一个非常典型的 分片线程池(Striped Thread Pool) 实现,名为 IgniteStripedThreadPoolExecutor,是 Apache Ignite 自定义的并发执行框架组件。


🧱 一、核心思想:什么是“Striped”线程池?

💡 关键特性:同一个“索引”(index)的任务,永远由同一个线程执行。

这解决了两个问题:

  1. 性能:避免锁竞争(多个任务操作同一数据时,串行化处理)
  2. 顺序性:保证特定数据的操作顺序(如 key=A 的消息不乱序)

类比理解:

想象一个快递分拣中心,有 N 个工人(线程),包裹按目的地编号 % N 分配给某个固定工人处理。

  • 所有发往“杭州”的包裹 → 都由 3 号工人处理
  • 所有发往“北京”的包裹 → 都由 1 号工人处理

这样既并行(多个城市同时处理),又保证了单个城市的顺序。


📦 二、字段解析

private final ExecutorService[] execs;
  • 这是一个 线程池数组,每个元素是一个独立的 ExecutorService
  • 数组长度 = concurrentLvl(并发级别),也就是“条带数”
  • 每个子线程池大小为 1(后面会看到)

✅ 相当于:创建了 N 个单线程池,组成一个“线程池组”


🔧 三、构造函数详解

public IgniteStripedThreadPoolExecutor(
    int concurrentLvl,
    String igniteInstanceName,
    String threadNamePrefix,
    UncaughtExceptionHandler eHnd,
    boolean allowCoreThreadTimeOut,
    long keepAliveTime)

参数说明:

参数 含义
concurrentLvl 并发等级 → 决定有多少个“条带”(即多少个子线程池)
igniteInstanceName 节点名,用于线程命名
threadNamePrefix 线程名前缀,如 "callback"
eHnd 异常处理器,捕获未处理异常
allowCoreThreadTimeOut 是否允许核心线程超时销毁
keepAliveTime 空闲线程等待新任务的最长时间

构造逻辑:

execs = new ExecutorService[concurrentLvl];

ThreadFactory factory = new IgniteThreadFactory(...);

for (int i = 0; i < concurrentLvl; i++) {
    IgniteThreadPoolExecutor executor = new IgniteThreadPoolExecutor(
        1,           // corePoolSize
        1,           // maximumPoolSize
        keepAliveTime,
        new LinkedBlockingQueue<>(),
        factory
    );
    executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
    execs[i] = executor;
}

✅ 每个子线程池都是 单线程执行器(Single-threaded)

🔍 为什么每个条带是单线程?

  • 保证 同一个 idx 的任务串行执行
  • 避免并发修改共享状态(如缓存、状态机)
  • 性能上接近无锁设计(只要哈希分布均匀)

🚀 四、核心方法:execute(Runnable task, int idx)

public void execute(Runnable task, int idx) {
    execs[threadId(idx)].execute(task);
}

这是唯一可用的提交任务的方法。

工作流程:

  1. 根据 idx 计算应该由哪个线程处理
  2. 提交到对应的子线程池
threadId(idx) 方法:
public int threadId(int idx) {
    return idx < execs.length ? idx : idx % execs.length;
}
  • 如果 idx 小于条带数 → 直接使用 idx
  • 否则取模 → 均匀分布到各个线程

✅ 这是一个 哈希映射策略,将任意整数 idx 映射到 [0, N) 范围内


⚠️ 五、禁用的方法(重要!)

这个类 故意禁用了标准 ExecutorService 的所有通用提交方法

@Override public void execute(Runnable cmd) {
    throw new UnsupportedOperationException();
}

@Override public <T> Future<T> submit(Callable<T> task) {
    throw new UnsupportedOperationException();
}
// ... 其他 submit/invoke 方法也都抛异常

❓ 为什么?

因为:

  • 没有 idx → 无法决定哪个线程执行
  • 必须显式指定 idx 才能路由任务
  • 强制用户遵守“按索引分片”的编程模型

✅ 这是一种 设计约束:你必须知道你的任务属于哪个“条带”


🔄 六、生命周期管理方法

这些方法对所有子线程池进行统一操作:

方法 行为
shutdown() 所有子线程池调用 shutdown()
shutdownNow() 所有子线程池尝试中断,并收集未执行任务
isShutdown() 所有都 shutdown 才返回 true
isTerminated() 所有都终止才返回 true
awaitTermination() 等待所有子线程池结束

✅ 符合 ExecutorService 接口规范,整体作为一个单元关闭


🎯 七、典型使用场景(在 Ignite 中)

这类线程池主要用于:

1. 异步回调执行callbackExecSvc

callbackExecSvc = new IgniteStripedThreadPoolExecutor(
    cfg.getAsyncCallbackPoolSize(),
    "callback",
    oomeHnd,
    false,
    0
);
  • 每个缓存键(key)的监听器回调 → 按 key.hashCode() % N 分配线程
  • 保证同一个 key 的事件不乱序

2. 数据流处理(DataStreamer)

  • 数据按 key 分片写入,每个分片由固定线程处理

3. 消息处理管道

  • 消息带有一个“会话ID”或“分区ID”,相同 ID 的消息必须顺序处理

📊 八、优缺点总结

优点 缺点
✅ 高并发 + 保序 ❌ 必须提前知道“分片键”(idx)
✅ 减少锁竞争(每个线程只处理自己的任务) ❌ 线程间负载可能不均(热 key 问题)
✅ 简单高效,接近无锁设计 ❌ 不支持 submit() 返回 Future(无法获取结果)
✅ 易于调试(知道哪个线程在处理哪类任务) ❌ 不能用于通用任务调度

🧩 九、和 JDK 原生类的对比

类型 特点
Executors.newFixedThreadPool(N) 所有任务随机分配给 N 个线程,无顺序保证
Executors.newSingleThreadExecutor() 所有任务串行执行,性能低
ForkJoinPool.commonPool() 工作窃取,适合分治任务
IgniteStripedThreadPoolExecutor 分片并行 + 局部串行,兼顾吞吐与顺序

🔁 它填补了“完全并行”和“完全串行”之间的空白


✅ 十、一句话总结

IgniteStripedThreadPoolExecutor 是一种 基于索引分片的任务调度器,它通过将任务绑定到固定的线程上来实现 局部串行 + 全局并行,特别适用于需要 顺序处理但又追求高吞吐 的场景(如事件回调、消息队列、数据流等)。


💡 使用建议

// 示例:按缓存 key 分发回调
int idx = key.hashCode();
stripedExecutor.execute(() -> {
    // 处理某个 key 的事件
}, idx);
  • 选择合适的 concurrentLvl(通常为 CPU 核心数或稍大)
  • 确保 idx 分布均匀,避免“热点线程”
  • 不要用它执行长时间阻塞任务(会影响该条带的所有任务)

如果你想实现类似功能,也可以基于此模式封装自己的 StripedExecutor