Apache Ignite 的 Ping-Pong 消息通信示例

发布于:2025-08-08 ⋅ 阅读:(19) ⋅ 点赞:(0)
package org.apache.ignite.examples.messaging;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;

import java.util.concurrent.CountDownLatch;

/**
 * Demonstrates simple message exchange between local and remote nodes.
 * <p>
 * To run this example you must have at least one remote node started.
 * <p>
 * Remote nodes should always be started with special configuration file which
 * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
 * <p>
 * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node
 * with {@code examples/config/example-ignite.xml} configuration.
 */
public class MessagingPingPongExample {
    /**
     * Executes example.
     *
     * @param args Command line arguments, none required.
     * @throws Exception If example execution failed.
     */
    public static void main(String[] args) throws Exception {
        // Game is played over the default ignite.
        try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
            if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2))
                return;

            System.out.println();
            System.out.println(">>> Messaging ping-pong example started.");

            // Pick random remote node as a partner.
            ClusterGroup nodeB = ignite.cluster().forRemotes().forRandom();

            // Note that both nodeA and nodeB will always point to
            // same nodes regardless of whether they were implicitly
            // serialized and deserialized on another node as part of
            // anonymous closure's state during its remote execution.

            // Set up remote player.
            ignite.message(nodeB).remoteListen(null, (nodeId, rcvMsg) -> {
                System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');

                if ("PING".equals(rcvMsg)) {
                    ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PONG");

                    return true; // Continue listening.
                }

                return false; // Unsubscribe.
            });

            int MAX_PLAYS = 10;

            final CountDownLatch cnt = new CountDownLatch(MAX_PLAYS);

            // Set up local player.
            ignite.message().localListen(null, (nodeId, rcvMsg) -> {
                System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');

                if (cnt.getCount() == 1) {
                    ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "STOP");

                    cnt.countDown();

                    return false; // Stop listening.
                }
                else if ("PONG".equals(rcvMsg))
                    ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PING");
                else
                    throw new IgniteException("Received unexpected message: " + rcvMsg);

                cnt.countDown();

                return true; // Continue listening.
            });

            // Serve!
            ignite.message(nodeB).send(null, "PING");

            // Wait til the game is over.
            try {
                cnt.await();
            }
            catch (InterruptedException e) {
                System.err.println("Hm... let us finish the game!\n" + e);
            }
        }
    }
}

这是一个 Apache Ignite“Ping-Pong” 消息通信示例,形象地模拟了两个节点之间像打乒乓球一样的消息来回交互过程。相比前一个例子,这个示例更注重交互性状态控制

我们来逐步深入理解这段代码的结构、逻辑和设计思想。


🎯 一、整体目标

实现一个分布式“乒乓游戏”:

  • 本地节点(A)向一个远程随机节点(B)发送 "PING"
  • 节点 B 收到后回复 "PONG"
  • 节点 A 再回复 "PING"……
  • 如此往复,直到达到预设次数(10 次)。
  • 最后一次,A 发送 "STOP",B 收到后停止监听。

✅ 这是一个典型的 请求-响应式通信模型(Request-Reply Pattern),用于演示节点间的双向异步消息交互


📦 二、关键类与接口回顾

类/接口 作用
Ignite 启动并代表一个集群节点
Ignition.start(...) 启动当前节点
ClusterGroup 表示一组集群节点
Ignite.message(...) 获取 IgniteMessaging 实例用于消息收发
remoteListen(...) 在远程节点上注册消息监听器(借助 P2P 类加载)
localListen(...) 在本地节点注册监听器
CountDownLatch 用于主线程等待游戏结束

🔍 三、代码逐段解析

1️⃣ 启动节点 & 检查拓扑

try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
    if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2))
        return;
  • 启动本地节点。
  • 确保集群中至少有两个节点(本节点 + 一个远程节点)。

2️⃣ 选择“对手”节点

ClusterGroup nodeB = ignite.cluster().forRemotes().forRandom();
  • 从所有远程节点中随机选一个作为通信对象(nodeB)。
  • 整个“乒乓”游戏将在 本地节点 A远程节点 B 之间进行。

3️⃣ 在远程节点 B 上设置监听器(“对手 AI”)

ignite.message(nodeB).remoteListen(null, (nodeId, rcvMsg) -> {
    System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');

    if ("PING".equals(rcvMsg)) {
        ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PONG");
        return true; // 继续监听
    }

    return false; // 收到非 PING 消息则停止监听
});
🔹 功能说明:
  • 这个 Lambda 是在远程节点 B 上执行的(通过 remoteListen 部署过去)。
  • 当 B 收到消息:
    • 如果是 "PING" → 打印日志,并回复 "PONG"
    • 否则 → 停止监听(比如收到 "STOP"

⚠️ 注意:null 作为 topic 表示“所有消息”或默认主题。

✅ 关键点:
  • 远程节点的监听器是动态部署的,依赖于 P2P 类加载(Peer-to-Peer Class Loading)
  • 所以远程节点必须用 example-ignite.xml 启动(开启 peerClassLoadingEnabled="true"

4️⃣ 在本地节点 A 上设置监听器(“主控 AI”)

final CountDownLatch cnt = new CountDownLatch(MAX_PLAYS); // 10 次回合

ignite.message().localListen(null, (nodeId, rcvMsg) -> {
    System.out.println("Received message [msg=" + rcvMsg + ", sender=" + nodeId + ']');

    if (cnt.getCount() == 1) {
        // 这是最后一次回合
        ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "STOP");
        cnt.countDown();
        return false; // 不再监听
    }
    else if ("PONG".equals(rcvMsg)) {
        // 收到 PONG,回击 PING
        ignite.message(ignite.cluster().forNodeId(nodeId)).send(null, "PING");
    }
    else {
        throw new IgniteException("Received unexpected message: " + rcvMsg);
    }

    cnt.countDown();
    return true; // 继续监听
});
🔹 功能说明:

这是本地节点的行为逻辑:

条件 行为
收到 "PONG" 回复 "PING",继续游戏
剩最后一次(cnt=1 回复 "STOP",然后退出监听
收到其他消息 抛出异常

💡 CountDownLatch 用来计数总共要处理 10 次消息(每次收到 PONGSTOP 都减一)。


5️⃣ 开始游戏:“发球!”

// Serve!
ignite.message(nodeB).send(null, "PING");
  • 主动发起第一轮:向 nodeB 发送 "PING",游戏开始!

6️⃣ 等待游戏结束

cnt.await();
  • 主线程阻塞,直到 cnt 被减到 0(即完成了 10 次交互)。
  • 如果被中断,会打印警告但不强制退出。

🔄 四、消息交互流程(时序图)

[本地节点 A]                     [远程节点 B]
     │                                │
     │-------- "PING" --------------->│
     │                                │
     │<------- "PONG" ----------------│
     │                                │
     │-------- "PING" --------------->│
     │                                │
     │<------- "PONG" ----------------│
     │                                │
     ... (重复直到第9次) ...
     │
     │-------- "PING" --------------->│
     │                                │
     │<------- "PONG" ----------------│
     │                                │
     │-------- "STOP" --------------->│
     │                                │
     │                                │ ← 监听器返回 false,停止
     ↓
cnt.countDown() → cnt=0 → await() 返回 → 游戏结束

📌 总共:

  • A 发出:9 次 "PING" + 1 次 "STOP" = 10 次发送
  • A 收到:9 次 "PONG" + 1 次 "STOP"(来自 B 的最后一次回应?不,STOP 是 A 主动发的)

实际上 cnt 是对 A 收到的消息计数,共 10 次(9×PONG + 1×STOP 回执?但 STOP 不一定有回复)

⚠️ 注意:"STOP" 是 A 发给 B 的,B 不会再回复。所以 cnt 实际是对 A 收到的 PONG 消息计数(9次)+ 最后一次处理 STOP 的逻辑触发 countDown()

更准确地说:cnt 是每处理一次消息就 countDown(),共 10 次。


🧠 五、设计亮点与技巧

特性 说明
状态驱动交互 利用 cnt.getCount() == 1 判断是否最后一轮
双向通信 A 和 B 都能收发消息,形成闭环
动态远程监听 使用 remoteListen 在远端部署行为逻辑
无主题通信 使用 null topic,简化通信模型
优雅终止 通过 "STOP" 消息通知对方结束,避免无限循环

⚠️ 六、注意事项

  1. P2P 类加载必须开启

    • 远程节点必须使用 examples/config/example-ignite.xml 启动
    • 否则 remoteListen 的 Lambda 无法序列化和执行
  2. 消息不保证可靠送达

    • Ignite Messaging 是“最多一次”(at-most-once)
    • 网络问题可能导致消息丢失,进而导致 latch 永不结束
  3. Lambda 序列化限制

    • 匿名内部类或 Lambda 必须可序列化
    • 避免引用外部不可序列化的对象
  4. 资源清理

    • remoteListen 返回一个 UUID,可用于后续调用 stopRemoteListen(UUID) 清理
    • 否则监听器会一直存在

🧩 七、可以改进的地方(思考题)

问题 改进建议
"STOP" 是单向通知,B 是否真的停止了? 可让 B 回复 "GAME_OVER" 确认
消息丢失怎么办? 加入超时重试机制
只和一个节点玩没意思 可广播给多个节点,玩“群 ping”
使用 null topic 不够清晰 定义明确的 topic,如 "PING_PONG"

✅ 八、实际应用场景

虽然看起来像个“玩具”,但这种模式在真实系统中有广泛应用:

场景 类比
健康检查(Heartbeat) 定期 PING-PONG 检测节点存活
分布式协调 主节点通知从节点开始/停止任务
状态同步 节点间交换状态信息
轻量级 RPC 实现简单的远程调用协议

🧾 九、总结

这个 MessagingPingPongExample 是一个非常经典的 分布式交互式通信演示,它展示了:

核心能力 说明
🔄 双向消息通信 节点之间可以互相发送和接收消息
🧠 动态行为部署 使用 remoteListen 在远端“安装”逻辑
⏳ 异步 + 同步控制 CountDownLatch 控制主线程等待
🎮 状态驱动交互 根据计数决定下一步动作(发 PING 还是 STOP)
🌐 分布式协调原型 可扩展为更复杂的集群协作机制

💬 一句话总结
这是一个基于 Apache Ignite 消息系统的“乒乓”游戏,通过 remoteListenlocalListen 实现跨节点的交互式通信,展示了分布式系统中请求-响应模式的基本实现方式。


如果你想进一步探索:

  • 如何实现“多播 Ping”?
  • 如何加入超时和重试?
  • 如何用缓存事件替代消息?

欢迎继续提问!


网站公告

今日签到

点亮在社区的每一天
去签到