package org.apache.ignite.examples.messaging;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;
import java.util.concurrent.CountDownLatch;
/**
* Demonstrates simple message exchange between local and remote nodes.
* <p>
* To run this example you must have at least one remote node started.
* <p>
* Remote nodes should always be started with special configuration file which
* enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
* <p>
* Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node
* with {@code examples/config/example-ignite.xml} configuration.
*/
public class MessagingPingPongExample {
/**
* Executes example.
*
* @param args Command line arguments, none required.
* @throws Exception If example execution failed.
*/
public static void main(String[] args) throws Exception {
// Game is played over the default ignite.
try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2))
return;
System.out.println();
System.out.println(">>> Messaging ping-pong example started.");
// Pick random remote node as a partner.
ClusterGroup nodeB = ignite.cluster().forRemotes().forRandom();
// Note that both nodeA and nodeB will always point to
// same nodes regardless of whether they were implicitly
// serialized and deserialized on another node as part of
// anonymous closure's state during its remote execution.
// Set up remote player.
ignite.message(nodeB).remoteListen(null, (nodeId, rcvMsg) -> {
System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');
if ("PING".equals(rcvMsg)) {
ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PONG");
return true; // Continue listening.
}
return false; // Unsubscribe.
});
int MAX_PLAYS = 10;
final CountDownLatch cnt = new CountDownLatch(MAX_PLAYS);
// Set up local player.
ignite.message().localListen(null, (nodeId, rcvMsg) -> {
System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');
if (cnt.getCount() == 1) {
ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "STOP");
cnt.countDown();
return false; // Stop listening.
}
else if ("PONG".equals(rcvMsg))
ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PING");
else
throw new IgniteException("Received unexpected message: " + rcvMsg);
cnt.countDown();
return true; // Continue listening.
});
// Serve!
ignite.message(nodeB).send(null, "PING");
// Wait til the game is over.
try {
cnt.await();
}
catch (InterruptedException e) {
System.err.println("Hm... let us finish the game!\n" + e);
}
}
}
}
这是一个 Apache Ignite 的 “Ping-Pong” 消息通信示例,形象地模拟了两个节点之间像打乒乓球一样的消息来回交互过程。相比前一个例子,这个示例更注重交互性和状态控制。
我们来逐步深入理解这段代码的结构、逻辑和设计思想。
🎯 一、整体目标
实现一个分布式“乒乓游戏”:
- 本地节点(A)向一个远程随机节点(B)发送
"PING"
。 - 节点 B 收到后回复
"PONG"
。 - 节点 A 再回复
"PING"
…… - 如此往复,直到达到预设次数(10 次)。
- 最后一次,A 发送
"STOP"
,B 收到后停止监听。
✅ 这是一个典型的 请求-响应式通信模型(Request-Reply Pattern),用于演示节点间的双向异步消息交互。
📦 二、关键类与接口回顾
类/接口 | 作用 |
---|---|
Ignite |
启动并代表一个集群节点 |
Ignition.start(...) |
启动当前节点 |
ClusterGroup |
表示一组集群节点 |
Ignite.message(...) |
获取 IgniteMessaging 实例用于消息收发 |
remoteListen(...) |
在远程节点上注册消息监听器(借助 P2P 类加载) |
localListen(...) |
在本地节点注册监听器 |
CountDownLatch |
用于主线程等待游戏结束 |
🔍 三、代码逐段解析
1️⃣ 启动节点 & 检查拓扑
try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2))
return;
- 启动本地节点。
- 确保集群中至少有两个节点(本节点 + 一个远程节点)。
2️⃣ 选择“对手”节点
ClusterGroup nodeB = ignite.cluster().forRemotes().forRandom();
- 从所有远程节点中随机选一个作为通信对象(
nodeB
)。 - 整个“乒乓”游戏将在 本地节点 A 和 远程节点 B 之间进行。
3️⃣ 在远程节点 B 上设置监听器(“对手 AI”)
ignite.message(nodeB).remoteListen(null, (nodeId, rcvMsg) -> {
System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');
if ("PING".equals(rcvMsg)) {
ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PONG");
return true; // 继续监听
}
return false; // 收到非 PING 消息则停止监听
});
🔹 功能说明:
- 这个 Lambda 是在远程节点 B 上执行的(通过
remoteListen
部署过去)。 - 当 B 收到消息:
- 如果是
"PING"
→ 打印日志,并回复"PONG"
- 否则 → 停止监听(比如收到
"STOP"
)
- 如果是
⚠️ 注意:
null
作为 topic 表示“所有消息”或默认主题。
✅ 关键点:
- 远程节点的监听器是动态部署的,依赖于 P2P 类加载(Peer-to-Peer Class Loading)
- 所以远程节点必须用
example-ignite.xml
启动(开启peerClassLoadingEnabled="true"
)
4️⃣ 在本地节点 A 上设置监听器(“主控 AI”)
final CountDownLatch cnt = new CountDownLatch(MAX_PLAYS); // 10 次回合
ignite.message().localListen(null, (nodeId, rcvMsg) -> {
System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');
if (cnt.getCount() == 1) {
// 这是最后一次回合
ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "STOP");
cnt.countDown();
return false; // 不再监听
}
else if ("PONG".equals(rcvMsg)) {
// 收到 PONG,回击 PING
ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PING");
}
else {
throw new IgniteException("Received unexpected message: " + rcvMsg);
}
cnt.countDown();
return true; // 继续监听
});
🔹 功能说明:
这是本地节点的行为逻辑:
条件 | 行为 |
---|---|
收到 "PONG" |
回复 "PING" ,继续游戏 |
剩最后一次(cnt=1 ) |
回复 "STOP" ,然后退出监听 |
收到其他消息 | 抛出异常 |
💡
CountDownLatch
用来计数总共要处理 10 次消息(每次收到PONG
或STOP
都减一)。
5️⃣ 开始游戏:“发球!”
// Serve!
ignite.message(nodeB).send(null, "PING");
- 主动发起第一轮:向
nodeB
发送"PING"
,游戏开始!
6️⃣ 等待游戏结束
cnt.await();
- 主线程阻塞,直到
cnt
被减到 0(即完成了 10 次交互)。 - 如果被中断,会打印警告但不强制退出。
🔄 四、消息交互流程(时序图)
[本地节点 A] [远程节点 B]
│ │
│-------- "PING" --------------->│
│ │
│<------- "PONG" ----------------│
│ │
│-------- "PING" --------------->│
│ │
│<------- "PONG" ----------------│
│ │
... (重复直到第9次) ...
│
│-------- "PING" --------------->│
│ │
│<------- "PONG" ----------------│
│ │
│-------- "STOP" --------------->│
│ │
│ │ ← 监听器返回 false,停止
↓
cnt.countDown() → cnt=0 → await() 返回 → 游戏结束
📌 总共:
- A 发出:9 次
"PING"
+ 1 次"STOP"
= 10 次发送- A 收到:9 次
"PONG"
+ 1 次"STOP"
(来自 B 的最后一次回应?不,STOP 是 A 主动发的)实际上
cnt
是对 A 收到的消息计数,共 10 次(9×PONG + 1×STOP 回执?但 STOP 不一定有回复)
⚠️ 注意:
"STOP"
是 A 发给 B 的,B 不会再回复。所以cnt
实际是对 A 收到的PONG
消息计数(9次)+ 最后一次处理STOP
的逻辑触发countDown()
。
更准确地说:cnt
是每处理一次消息就 countDown()
,共 10 次。
🧠 五、设计亮点与技巧
特性 | 说明 |
---|---|
✅ 状态驱动交互 | 利用 cnt.getCount() == 1 判断是否最后一轮 |
✅ 双向通信 | A 和 B 都能收发消息,形成闭环 |
✅ 动态远程监听 | 使用 remoteListen 在远端部署行为逻辑 |
✅ 无主题通信 | 使用 null topic,简化通信模型 |
✅ 优雅终止 | 通过 "STOP" 消息通知对方结束,避免无限循环 |
⚠️ 六、注意事项
P2P 类加载必须开启
- 远程节点必须使用
examples/config/example-ignite.xml
启动 - 否则
remoteListen
的 Lambda 无法序列化和执行
- 远程节点必须使用
消息不保证可靠送达
- Ignite Messaging 是“最多一次”(at-most-once)
- 网络问题可能导致消息丢失,进而导致
latch
永不结束
Lambda 序列化限制
- 匿名内部类或 Lambda 必须可序列化
- 避免引用外部不可序列化的对象
资源清理
remoteListen
返回一个UUID
,可用于后续调用stopRemoteListen(UUID)
清理- 否则监听器会一直存在
🧩 七、可以改进的地方(思考题)
问题 | 改进建议 |
---|---|
"STOP" 是单向通知,B 是否真的停止了? |
可让 B 回复 "GAME_OVER" 确认 |
消息丢失怎么办? | 加入超时重试机制 |
只和一个节点玩没意思 | 可广播给多个节点,玩“群 ping” |
使用 null topic 不够清晰 |
定义明确的 topic,如 "PING_PONG" |
✅ 八、实际应用场景
虽然看起来像个“玩具”,但这种模式在真实系统中有广泛应用:
场景 | 类比 |
---|---|
健康检查(Heartbeat) | 定期 PING-PONG 检测节点存活 |
分布式协调 | 主节点通知从节点开始/停止任务 |
状态同步 | 节点间交换状态信息 |
轻量级 RPC | 实现简单的远程调用协议 |
🧾 九、总结
这个 MessagingPingPongExample
是一个非常经典的 分布式交互式通信演示,它展示了:
核心能力 | 说明 |
---|---|
🔄 双向消息通信 | 节点之间可以互相发送和接收消息 |
🧠 动态行为部署 | 使用 remoteListen 在远端“安装”逻辑 |
⏳ 异步 + 同步控制 | 用 CountDownLatch 控制主线程等待 |
🎮 状态驱动交互 | 根据计数决定下一步动作(发 PING 还是 STOP) |
🌐 分布式协调原型 | 可扩展为更复杂的集群协作机制 |
💬 一句话总结:
这是一个基于 Apache Ignite 消息系统的“乒乓”游戏,通过remoteListen
和localListen
实现跨节点的交互式通信,展示了分布式系统中请求-响应模式的基本实现方式。
如果你想进一步探索:
- 如何实现“多播 Ping”?
- 如何加入超时和重试?
- 如何用缓存事件替代消息?
欢迎继续提问!