分布式MQTT客户端看门狗机制设计与实现

发布于:2025-06-13 ⋅ 阅读:(20) ⋅ 点赞:(0)

2. 看门狗调度机制

  • 背景与问题

    面临的挑战

    在传统的微服务集群部署中,每个服务实例都可能需要连接MQTT服务器处理设备消息。这会带来几个问题:

    • 消息重复处理:多个节点同时订阅同一个Topic,导致同一条消息被处理多次
    • 资源浪费:每个节点都维护MQTT连接,占用不必要的网络和内存资源
    • 状态不一致:多个节点并发处理设备指令,可能导致设备状态混乱

    业务需求

    对于设备管理服务,我们需要确保:

    • 每条MQTT消息只被处理一次
    • 服务具备高可用性,单节点故障不影响消息处理
    • 系统能够自动进行故障恢复

    解决方案设计

    核心思想

    通过分布式锁 + 看门狗的机制,确保在任意时刻只有一个节点负责MQTT连接和消息处理,同时保证服务的高可用性。

  • @Component
    public class MqttClientStart implements ApplicationRunner, DisposableBean {
            private static final String MQTT_LOCK_KEY =        "Service:Mqtt:Consumers:Client:Watchdog:Lock";
        private static final Long LOCK_TIMEOUT = 120L;  // 锁超时时间
        private static final int LOCK_RENEW_INTERVAL = 100;  // 续期间隔
        
        private final String nodeId = RequestUtils.getHostname();  // 节点唯一标识
        private final AtomicBoolean watchdogRunning = new AtomicBoolean(false);
        private final AtomicBoolean mqttInitialized = new AtomicBoolean(false);
    }

    设计要点

  • 使用主机名作为节点唯一标识
  • 锁超时时间120秒,续期间隔100秒,避免网络抖动导致的锁丢失
  • 通过AtomicBoolean确保状态的线程安全
    private void startWatchdog() {
        watchdogExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "mqtt-watchdog-" + nodeId);
            thread.setDaemon(true);
            
            // 关键:设置未捕获异常处理器
            thread.setUncaughtExceptionHandler((t, ex) -> {
                log.error("Uncaught exception in watchdog thread {}: {}", 
                    t.getName(), ex.getMessage(), ex);
                handleWatchdogFailure(ex);
            });
            
            return thread;
        });
        
        watchdogExecutor.scheduleAtFixedRate(this::watchdogTask, 1, 
            LOCK_RENEW_INTERVAL, TimeUnit.SECONDS);
    }

设计亮点

  • 单线程调度器避免并发问题
  • 守护线程确保不阻塞应用关闭
  • 完善的异常处理机制

3. 核心业务逻辑

 

private void watchdogTask() {
    try {
        boolean hasLock = myRedisLock.tryReentrantLock(MQTT_LOCK_KEY, nodeId, LOCK_TIMEOUT);
        
        if (hasLock) {
            // 获得锁且未初始化 -> 初始化MQTT客户端
            if (!mqttInitialized.get()) {
                log.info("Node {} acquired lock. Initializing MQTT client...", nodeId);
                initializeMqttClient();
            }
        } else {
            // 失去锁且已初始化 -> 关闭MQTT客户端
            if (mqttInitialized.get()) {
                log.info("Node {} lost lock. Shutting down MQTT client...", nodeId);
                shutdownMqttClient();
            }
        }
    } catch (Exception e) {
        log.error("Error in MQTT watchdog task:", e);
        if (mqttInitialized.get()) {
            shutdownMqttClient();  // 异常时确保资源清理
        }
    }
}

核心逻辑

  • 持有锁 + 未初始化 → 启动MQTT客户端
  • 失去锁 + 已初始化 → 关闭MQTT客户端
  • 异常情况下确保资源清理

4. 故障恢复机制

private void handleWatchdogFailure(Throwable ex) {
    watchdogRunning.set(false);
    
    // 异步延迟重启
    CompletableFuture.runAsync(() -> {
        try {
            Thread.sleep(5000);  // 延迟5秒重启
            if (watchdogExecutor != null) {
                watchdogExecutor.shutdown();
            }
            startWatchdog();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

容错设计

  • 异常发生时自动重启看门狗
  • 延迟重启避免频繁失败
  • 异步处理不阻塞当前线程

运行流程

正常运行流程

  1. 应用启动:各节点启动看门狗线程
  2. 锁竞争:各节点尝试获取Redis分布式锁
  3. 角色确定:获得锁的节点成为Active,其他为Standby
  4. MQTT管理:Active节点初始化MQTT客户端,开始处理消息
  5. 锁续期:Active节点定期续期锁,Standby节点继续尝试获取锁

故障切换流程

  1. 故障检测:Active节点故障,停止锁续期
  2. 锁释放:Redis锁超时自动释放(120秒后)
  3. 角色切换:Standby节点获得锁,升级为Active
  4. 服务恢复:新Active节点初始化MQTT客户端,恢复消息处理

优势与权衡

主要优势

高可用性

  • 单节点故障时自动切换,服务不中断
  • 故障恢复时间可控(最多120秒)

数据一致性

  • 确保消息唯一性处理
  • 避免重复操作和状态冲突

运维友好

  • 自动故障检测和恢复
  • 完善的日志记录便于问题排查

设计权衡

性能方面

  • 牺牲了并发处理能力
  • MQTT处理能力无法水平扩展

资源利用

  • 其他节点的MQTT处理资源闲置
  • 可能造成负载不均

    适用场景

    这种设计适合以下场景:

    • 对消息处理一致性要求较高
    • MQTT消息量不大,单节点可以处理
    • 更重视可用性而非性能