import com.xquant.risk.infra.ignite.properties.IgniteBaseProperties;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.apache.ignite.events.EventType.*;
@Slf4j
@AllArgsConstructor
public class Igniter {
private final IgniteConfiguration cfg;
private final IgniteEventListener igniteEventListener;
private final IgniteBaseProperties basicProperties;
public Ignite start() {
Ignite ignite = Ignition.start(this.cfg);
IgniteCluster cluster = ignite.cluster();
if (Objects.nonNull(cluster) && !cluster.active()) {
cluster.active(true);
}
List<Integer> joinEvent = Arrays.asList(EVT_NODE_JOINED, EVT_CLIENT_NODE_RECONNECTED);
List<Integer> leaveEvent = Arrays.asList(EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_SEGMENTED, EVT_CLIENT_NODE_DISCONNECTED);
if (basicProperties.isListenDiscoveryEvent()) {
((TcpDiscoverySpi) cfg.getDiscoverySpi()).getSpiContext().addLocalEventListener(evt -> {
if (log.isInfoEnabled()) {
log.info("当前Ignite回调事件: {}", evt.type());
}
try {
if (joinEvent.contains(evt.type())) {
igniteEventListener.onJoin(cluster, evt);
}
if (leaveEvent.contains(evt.type())) {
igniteEventListener.onLeave(cluster, evt);
}
} catch (Throwable e) {
log.error("当前Ignite回调服务发生异常", e);
IgniteEventListener.exit("1");
}
}, EVTS_DISCOVERY);
}
log.info("Ignite start success !");
return ignite;
}
}
这是一个 Apache Ignite 的 消息通信示例(Messaging Example),用于展示如何在集群节点之间通过消息传递进行通信。我们来逐步分析这个代码的结构和逻辑,帮助你深入理解其工作原理。
🧩 一、整体目标
该示例演示了:
- 在 Ignite 集群中发送和接收两种类型的消息:
- 有序消息(Ordered)
- 无序消息(Unordered)
- 使用
IgniteMessaging
API 实现跨节点通信。 - 利用
CountDownLatch
等待所有消息被确认处理完毕。
⚠️ 前提:至少启动两个节点(一个本地 + 一个或多个远程),且远程节点需使用 P2P 类加载配置(如
example-ignite.xml
)。
📦 二、关键类与接口说明
类/接口 | 作用 |
---|---|
Ignite |
核心入口,代表一个节点实例 |
Ignition.start(...) |
启动一个 Ignite 节点 |
IgniteMessaging |
提供消息发送/监听功能 |
ClusterGroup |
表示一组集群节点(比如所有远程节点) |
CountDownLatch |
并发工具,用于主线程等待其他线程完成任务 |
🔍 三、代码逐段解析
1️⃣ 静态常量定义
private static final int MESSAGES_NUM = 10;
- 每种消息发送 10 条。
private enum TOPIC { ORDERED, UNORDERED }
- 定义两个消息主题(可以理解为“频道”),分别用于区分有序和无序消息。
2️⃣ 主函数入口 main
try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
- 启动当前节点,加载配置文件(通常是
example-ignite.xml
)。 - 使用 try-with-resources 自动关闭。
if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2))
return;
- 检查集群是否至少有 2 个节点(本节点 + 至少一个远程节点),否则退出。
ClusterGroup rmtGrp = ignite.cluster().forRemotes();
- 获取除当前节点外的所有远程节点组成的集群组。
int msgCnt = rmtGrp.nodes().size() * MESSAGES_NUM;
CountDownLatch orderedLatch = new CountDownLatch(msgCnt);
CountDownLatch unorderedLatch = new CountDownLatch(msgCnt);
- 假设有 N 个远程节点,每个会收到 10 条消息 → 共
N * 10
条。 - Latch 用来等待这些消息被“回执”确认。
localListen(ignite.message(ignite.cluster().forLocal()), orderedLatch, unorderedLatch);
- 在本地节点注册监听器,用来接收来自远程节点的“回声”消息(即确认),每收到一条就
countDown()
。
startListening(ignite, ignite.message(rmtGrp));
- 在所有远程节点上注册监听器,当它们收到消息时打印并回传给发送方。
✅ 这是关键点:远程节点的监听器是通过
remoteListen()
动态部署的(借助 P2P 类加载)。
3️⃣ 发送消息阶段
发送无序消息(Unordered)
for (int i = 0; i < MESSAGES_NUM; i++)
ignite.message(rmtGrp).send(TOPIC.UNORDERED, Integer.toString(i));
- 使用
.send()
发送,不保证顺序。 - 所有远程节点都会收到编号 0~9 的消息,但可能乱序。
发送有序消息(Ordered)
for (int i = 0; i < MESSAGES_NUM; i++)
ignite.message(rmtGrp).sendOrdered(TOPIC.ORDERED, Integer.toString(i), 0);
- 使用
.sendOrdered(...)
发送,保证同一主题下的消息按发送顺序被处理。 - 第三个参数
0
是“顺序化键”(order key),决定哪些消息共享同一个顺序队列。
💡 举例:如果多个线程发消息,但用相同的 order key,则这些消息会被串行化处理。
4️⃣ 等待确认
orderedLatch.await();
unorderedLatch.await();
- 主线程阻塞,直到所有远程节点发回来的“回执”都被本地监听器接收完毕。
- 每个远程节点收到消息后会回传一次,所以总共要等
N * 10
次。
🎯 四、核心方法详解
✅ startListening(...)
—— 远程节点监听设置
imsg.remoteListen(TOPIC.ORDERED, (nodeId, msg) -> { ... });
- 在远程节点上注册一个持久化监听器,一旦收到
TOPIC.ORDERED
的消息就执行 Lambda。 - 打印日志,并回传消息给原节点(作为 ACK 确认)。
- 返回
true
表示继续监听。
🔁 回传路径:
ignite.message(ignite.cluster().forNodeId(nodeId)).send(...)
这一步实现了“消息回响(Echo)”,让发送方知道消息已被接收。
✅ localListen(...)
—— 本地监听确认消息
imsg.localListen(TOPIC.ORDERED, (nodeId, msg) -> {
orderedLatch.countDown();
return orderedLatch.getCount() > 0;
});
- 监听本地收到的来自远程节点的“回执”消息。
- 每收到一条,计数器减一。
- 当计数归零时自动停止监听(返回
false
)。
🔄 五、消息流动流程图
[本地节点]
│
├── send(TOPIC.UNORDERED, "0") ──────────────┐
├── send(TOPIC.UNORDERED, "1") ──────────────┤
│ ↓
│ [远程节点A/B/C...]
│ │
│ ├── 打印:"Received unordered message [msg=0]"
│ └── 回传 "0" 给本地节点
│
├── sendOrdered(TOPIC.ORDERED, "0") ────────┐
├── sendOrdered(TOPIC.ORDERED, "1") ────────┤
│ ↓
│ [远程节点A/B/C...]
│ │
│ ├── 打印:"Received ordered message [msg=0]"
│ └── 回传 "0" 给本地节点
│
←────────────────────────────────────────────
← 回执消息到达本地节点 → latch.countDown()
│
↓
当 latch 归零 → 输出 “Messaging example finished.”
⚖️ 六、有序 vs 无序 消息对比
特性 | 无序消息 .send() |
有序消息 .sendOrdered() |
---|---|---|
是否保证顺序 | ❌ 不保证 | ✅ 同一 order key 下保证顺序 |
性能 | 高(异步并行) | 稍低(需排队) |
适用场景 | 日志广播、通知等 | 事件流、状态变更序列等 |
🛠️ 七、注意事项 & 常见问题
P2P 类加载必须开启
- 远程节点必须使用
example-ignite.xml
启动,其中启用了<property name="peerClassLoadingEnabled" value="true"/>
- 否则
remoteListen()
无法部署监听器类(序列化失败)
- 远程节点必须使用
防火墙/网络问题
- 多节点通信需要端口互通(默认 47100+)
消息可靠性
- Ignite 消息是“最多一次”(at-most-once),不保证送达。
- 若需可靠传递,应结合缓存事件或自定义重试机制。
资源清理
remoteListen()
注册的监听器会在节点重启或显式取消前一直存在。- 可通过返回的
UUID
调用stopRemoteListen()
清理。
✅ 八、如何运行这个例子?
先启动一个或多个远程节点:
ignite.sh examples/config/example-ignite.xml
再运行本例(
MessagingExample.java
)作为客户端或另一个服务端节点。观察控制台输出,你会看到:
- 本地节点打印:“Finished sending…”
- 远程节点打印:“Received unordered/ordered message…”
- 最终本地节点打印:“Messaging example finished.”
🧠 九、实际应用场景
场景 | 说明 |
---|---|
分布式事件通知 | 某个节点状态变化,通知其他节点刷新缓存 |
日志聚合 | 收集各节点日志到中心节点 |
分布式协调 | 触发所有节点执行某操作(如重新加载配置) |
实时消息推送 | 构建轻量级发布/订阅系统 |
✅ 总结
这个例子展示了 Apache Ignite 的 分布式消息通信能力,核心要点如下:
要点 | 说明 |
---|---|
✅ 消息分类 | 支持有序/无序消息 |
✅ 动态监听 | remoteListen() 可远程部署监听器 |
✅ 主题机制 | 使用枚举或字符串作为消息主题 |
✅ 回调确认 | 结合 CountDownLatch 实现同步等待 |
✅ 易于集成 | 适合做集群内轻量级通信 |
💬 一句话总结:
这是一个典型的“广播 + 回执 + 同步等待”的分布式消息模式,利用 Ignite 的Messaging
API 实现跨节点通信,适用于需要节点间松耦合交互的场景。
如果你还想了解:
- 如何实现“可靠消息”?
- 如何用 Topic 做发布订阅?
- 如何监听缓存事件?
欢迎继续提问!