【React】MQTT + useEventBus 实现MQTT长连接以及消息分发

发布于:2025-07-13 ⋅ 阅读:(17) ⋅ 点赞:(0)

MQTT封装

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅(Pub/Sub)模式的轻量级消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)设备通信。。

项目中导入mqtt

使用yarn安装

yarn add mqtt

使用npm安装

npm install mqtt

封装MQTT操作类

为了保持MQTT在全局的唯一性,我们需要封装一个MQTT的单例操作类来统一管理MQTT相关的操作(连接、断开、订阅、重连等)
新建 MQTTService.js

import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo, logWarn} from "@utils/LogUtils";

let _instance = null // 单例实例

// 初始化 MQTT 工具的状态
function initMqtt() {
    if (_instance) {
        return _instance // 如果实例已存在,直接返回
    }

    let client = null
    let connectStatus = 'Connect'
    let isSubed = false // 订阅状态
    let initHost = "ws://xxx.xxx.xxx.xxx:xxxx/mqtt"
    let initOptions = {
        clientId: '',
        username: '',
        password: ''
    }

    _instance = { // 将实例赋值给 _instance

        getConnectStatus: () => connectStatus,

        getIsSubscribedStatus: () => isSubed, // 获取订阅状态

        //全局MQTT连接
        handleConnect: function () {
            if (connectStatus !== 'Connect') {
                logInfo("MQTT已连接")
                return
            }
            _instance.handleCustomConnect(initHost, initOptions)
        },

        handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {
            connectStatus = 'Connecting'
            client = mqtt.connect(host, mqttOptions)
            logInfo('------> [ Connecting ] ------>', host, mqttOptions)

            if (client) {
                client.on('connect', () => {
                    connectStatus = 'Connected'
                    logInfo('------> [ Connected ]')
                    if (autoSub) {
                        //默认订阅主题
                        _instance.handleSubscribe(
                            [
                                'MqttTopic1',
                                'MqttTopic2',
                                'MqttTopic3',
                            ],
                            0
                        )
                    }
                    if (onConnect) onConnect()
                });

                client.on('error', (err) => {
                    logError('------> [ Connection error ]', err)
                    if (onError) onError(err)
                    client.end()
                });

                client.on('reconnect', () => {
                    connectStatus = 'Reconnecting'
                    logError('------> [ Reconnecting ]')
                    if (onReconnect) onReconnect()
                });

                client.on('message', (topic, message) => {
                    logInfo('------> [ topic ] ------>', topic)
                    logInfo('------> [ message ] ---->', message.toString())
                    const payload = {topic, message: message.toString()}
                    //处理mqtt消息
                    dealMqttMsg(topic, message.toString())
                    if (onMessage) onMessage(payload)
                });
            }
        },

        handleSubscribe: (topic, qos, callback) => {
            if (!client) {
                throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');
            }
            client.subscribe(topic, {qos}, (err) => {
                if (err) {
                    logError('Subscribe failed:', err)
                    return
                }
                isSubed = true// 更新订阅状态为已订阅
                logInfo('------> [ 订阅主题成功 ]')
                if (callback) callback()
            });
        },

        handleUnsub: (topic, qos, callback) => {
            if (client) {
                client.unsubscribe(topic, {qos}, (error) => {
                    if (error) {
                        logError('Unsubscribe error', error)
                        return
                    }
                    isSubed = false // 更新订阅状态为未订阅
                    if (callback) callback()
                })
            }
        },

        handlePublish: (pubRecord) => {
            if (!client) {
                logError('MQTT 客户端未初始化,无法发布消息')
                return
            }
            const {topic, qos, payload} = pubRecord
            client.publish(topic, payload, {qos}, (error) => {
                if (error) {
                    logError('------> [ 发布消息失败 ] ------>', error)
                    throw new Error(`发布消息失败: ${error.message || '未知错误'}`)
                }
            })
        },

        handleDisconnect: (callback) => {
            if (client) {
                try {
                    // 移除所有事件监听器
                    client.removeAllListeners()
                    //
                    client.end(false, () => {
                        connectStatus = 'Connect' // 确保断开连接后状态正确更新
                        client = null
                        logInfo('------> [ disconnect ]')
                        if (callback) callback()
                    })
                } catch (error) {
                    connectStatus = 'Connect' // 确保断开连接后状态正确更新
                    logInfo('------> [ disconnect error ] ------>', error)
                }
            }
        },
    }

    return _instance
}

export default initMqtt;

消息分发

工具类的封装

全局MQTT连接到服务器之后,会通过订阅主题来接收推送的消息,推送的消息是 topic(主题)+ message(消息内容)组成。我们需要根据不同的topic主题来进行消息的分发,所以需要封装一个消息分发的工具类。
新建 EventEmitter.js

import {pull} from 'lodash';

/**
 * 自定义事件触发器类,用于管理事件监听和触发
 */
class EventEmitter {
    /**
     * 构造函数,初始化事件存储对象
     */
    constructor() {
        this._events = {}; // 存储事件名与回调函数列表的映射
    }

    /**
     * 获取指定事件的回调函数列表,若不存在则创建空数组
     * @param {string} event - 事件名称
     * @returns {Function[]} 当前事件的回调函数列表
     */
    _getFns(event) {
        return this._events[event] || (this._events[event] = []);
    }

    /**
     * 监听指定事件,添加回调函数
     * @param {string} event - 事件名称
     * @param {Function} cb - 回调函数
     */
    on(event, cb) {
        const fns = this._getFns(event);
        fns.push(cb); // 将回调加入该事件的队列中
    }

    /**
     * 移除指定事件的某个回调函数或整个事件的所有回调
     * @param {string} event - 事件名称
     * @param {Function} [cb] - 要移除的回调函数,不传则清空整个事件
     */
    off(event, cb) {
        if (cb) {
            const fns = this._getFns(event);
            pull(fns, cb); // 使用 lodash 的 pull 方法从数组中移除指定回调
        } else {
            delete this._events[event]; // 不传回调时删除整个事件键
        }
    }

    /**
     * 绑定只执行一次的事件监听器
     * @param {string} event - 事件名称
     * @param {Function} cb - 回调函数
     */
    once(event, cb) {
        const fn2 = (e) => {
            this.off(event, fn2); // 执行后立即解绑自身
            cb(e); // 执行原始回调
        };
        this.on(event, fn2); // 绑定包装后的回调
    }

    /**
     * 同步触发指定事件,依次执行所有回调
     * @param {string} event - 事件名称
     * @param {*} [param] - 传递给回调函数的参数
     */
    emit(event, param) {
        const fns = this._getFns(event);
        for (let i = 0; i < fns.length; i++) {
            const fn = fns[i];
            fn(param); // 同步执行每个回调函数
        }
    }

    /**
     * 异步触发指定事件,返回一个Promise
     * 注意:此方法仅触发第一个回调并返回其 Promise 结果
     * @param {string} event - 事件名称
     * @param {*} [param] - 传递给回调函数的参数
     * @returns {Promise} 返回第一个回调执行后的 Promise
     */
    invoke(event, param) {
        const fns = this._getFns(event);
        for (let i = 0; i < fns.length; i++) {
            const fn = fns[i];
            return new Promise((resolve, reject) => {
                resolve(fn(param)); // 将第一个回调的结果封装为 Promise
            });
        }
        return Promise.reject(); // 如果没有回调,则返回拒绝状态
    }
}

export default EventEmitter;

消息分发处理

新建 Events.js

import EventEmitter from './EventEmitter.js';
import {useEffect} from 'react';

const events = new EventEmitter();

export default events;

/**
 * 订阅事件总线
 *
 * @param  event - 事件名称
 * @param  callback - 回调函数
 *
 * @example
 * useEventBus('自定义事件名', (pushInfo) => {
 *   console.log(pushInfo);
 * });
 */
export function useEventBus(event, callback) {
    useEffect(() => {
        events.on(event, callback);
        return () => {
            events.off(event, callback);
        };
    });
}

/**
 * mqtt消息处理
 * @param topic
 * @param message
 */
export function dealMqttMsg(topic, message) {
    switch (topic) {
        //网关配置相关订阅
        case 'MqttTopic1':
            //发送消息
            events.emit('MqttTopic1', message)
            break;
        case 'MqttTopic2':
            //发送消息
            events.emit('MqttTopic2', message)
            break;
        case 'MqttTopic3':
            //发送消息
            events.emit('MqttTopic3', message)
            break;
    }
}

消息订阅

我们可以在页面上通过订阅的方式来进行消息的接收。


    /**
     * MQTT消息订阅
     */
    useEventBus('MqttTopic1', (message) => {
       console.log(message)
    })

MQTT使用

初始化

如果需要在全局进行MQTT连接,则可以在App.js里面进行mqtt的连接和断开,或者在登陆、登出进行同样可以。



    useEffect(() => {
        if (initMqtt().getConnectStatus() === "Connect") {
            //连接MQTT
            initMqtt().handleConnect()
        }
        return () => {
            //组件销毁时关闭连接
            initMqtt().handleDisconnect()
        }
    }, [])

心跳

新建 HeartBeatUtils.js

let heartTimerId = null;

export const startHeartBeat = (intervalTime, intervalMethod) => {
    if (!intervalTime || !intervalMethod) {
        console.warn('Invalid parameters for startHeartBeat');
        return null;
    }
    // 清除已有的定时器
    clearHeartBeat();
    // 设置新的定时器
    heartTimerId = setInterval(async () => {
        intervalMethod()
    }, intervalTime);
    //
    intervalMethod()
};

export const clearHeartBeat = () => {
    if (heartTimerId) {
        clearTimeout(heartTimerId);
        heartTimerId = null;
    }
};

可以在连接MQTT的回调中使用,MQTT连接成功后开启心跳定时器,发送心跳消息,在MQTT断开连接时,手动停止心跳定时器即可。


        handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {
            connectStatus = 'Connecting'
            client = mqtt.connect(host, mqttOptions)
            logInfo('------> [ Connecting ] ------>', host, mqttOptions)

            if (client) {
                client.on('connect', () => {
                    connectStatus = 'Connected'
                    logInfo('------> [ Connected ]')
			        // 启动心跳定时器
			        startHeartBeat(1000 * 60, () => {
			        	//实现具体的心跳方法(发送指定的心跳数据)
                        _instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})
			        })
			        //是否自动订阅主题
                    if (autoSub) {
                        //默认订阅主题
                        _instance.handleSubscribe(
                            [
                                'MqttTopic1',
                                'MqttTopic2',
                                'MqttTopic3',
                            ],
                            0
                        )
                    }
                    if (onConnect) onConnect()
                });

                client.on('error', (err) => {
                    logError('------> [ Connection error ]', err)
                    if (onError) onError(err)
                    client.end()
                });

                client.on('reconnect', () => {
                    connectStatus = 'Reconnecting'
                    logError('------> [ Reconnecting ]')
                    if (onReconnect) onReconnect()
                });

                client.on('message', (topic, message) => {
                    logInfo('------> [ topic ] ------>', topic)
                    logInfo('------> [ message ] ---->', message.toString())
                    const payload = {topic, message: message.toString()}
                    //处理mqtt消息
                    dealMqttMsg(topic, message.toString())
                    if (onMessage) onMessage(payload)
                });
            }
        }

        handleDisconnect: (callback) => {
            if (client) {
                try {
                    // 移除所有事件监听器
                    client.removeAllListeners()
                    //
                    client.end(false, () => {
                        connectStatus = 'Connect' // 确保断开连接后状态正确更新
                        client = null
                        //移除心跳定时器
                        clearHeartBeat()
                        logInfo('------> [ disconnect ]')
                        if (callback) callback()
                    })
                } catch (error) {
                    connectStatus = 'Connect' // 确保断开连接后状态正确更新
                    logInfo('------> [ disconnect error ] ------>', error)
                }
            }
        }
    

完整的MQTT相关操作类如下所示:

import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo} from "@utils/LogUtils";
import {startHeartBeat} from "./util/HeartBeatUtils";

let _instance = null // 单例实例

// 初始化 MQTT 工具的状态
function initMqtt() {
    if (_instance) {
        return _instance // 如果实例已存在,直接返回
    }

    let client = null
    let connectStatus = 'Connect'
    let isSubed = false // 订阅状态
    let initHost = "ws://XXX.XXX.X.X:XXXX/aiot/mqtt"
    let initOptions = {
        clientId: '',
        username: '',
        password: ''
    }

    _instance = {

        getConnectStatus: () => connectStatus,

        getIsSubscribedStatus: () => isSubed, // 获取订阅状态

        //全局MQTT连接
        handleConnect: function () {
            if (connectStatus !== 'Connect') {
                logInfo("MQTT已连接")
                return
            }
            _instance.handleCustomConnect(initHost, initOptions)
        },

        handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {
            connectStatus = 'Connecting'
            client = mqtt.connect(host, mqttOptions)
            logInfo('------> [ Connecting ] ------>', host, mqttOptions)

            if (client) {
                client.on('connect', () => {
                    connectStatus = 'Connected'
                    // 启动心跳定时器
                    startHeartBeat(1000 * 60, () => {
                        _instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})
                    })
                    logInfo('------> [ Connected ]')
                    if (autoSub) {
                        //默认订阅主题
                        _instance.handleSubscribe(
                            [
                                'MqttTopic1',
                                'MqttTopic2',
                                'MqttTopic3',
                            ],
                            0
                        )
                    }
                    if (onConnect) onConnect()
                });

                client.on('error', (err) => {
                    logError('------> [ Connection error ]', err)
                    if (onError) onError(err)
                    client.end()
                });

                client.on('reconnect', () => {
                    connectStatus = 'Reconnecting'
                    logError('------> [ Reconnecting ]')
                    if (onReconnect) onReconnect()
                });

                client.on('message', (topic, message) => {
                    logInfo('------> [ topic ] ------>', topic)
                    logInfo('------> [ message ] ---->', message.toString())
                    const payload = {topic, message: message.toString()}
                    //处理mqtt消息
                    dealMqttMsg(topic, message.toString())
                    if (onMessage) onMessage(payload)
                });
            }
        },

        handleSubscribe: (topic, qos, callback) => {
            if (!client) {
                throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');
            }
            client.subscribe(topic, {qos}, (err) => {
                if (err) {
                    logError('Subscribe failed:', err)
                    return
                }
                isSubed = true// 更新订阅状态为已订阅
                logInfo('------> [ 订阅主题成功 ]')
                if (callback) callback()
            });
        },

        handleUnsub: (topic, qos, callback) => {
            if (client) {
                client.unsubscribe(topic, {qos}, (error) => {
                    if (error) {
                        logError('Unsubscribe error', error)
                        return
                    }
                    isSubed = false // 更新订阅状态为未订阅
                    if (callback) callback()
                })
            }
        },

        handlePublish: (pubRecord) => {
            if (!client) {
                logError('MQTT 客户端未初始化,无法发布消息')
                return
            }
            const {topic, qos, payload} = pubRecord
            client.publish(topic, payload, {qos}, (error) => {
                if (error) {
                    logError('------> [ 发布消息失败 ] ------>', error)
                    throw new Error(`发布消息失败: ${error.message || '未知错误'}`)
                }
            })
        },

        handleDisconnect: (callback) => {
            if (client) {
                try {
                    // 移除所有事件监听器
                    client.removeAllListeners()
                    //
                    client.end(false, () => {
                        connectStatus = 'Connect' // 确保断开连接后状态正确更新
                        client = null
                        logInfo('------> [ disconnect ]')
                        if (callback) callback()
                    })
                } catch (error) {
                    connectStatus = 'Connect' // 确保断开连接后状态正确更新
                    logInfo('------> [ disconnect error ] ------>', error)
                }
            }
        },
    }

    return _instance
}

export default initMqtt;



网站公告

今日签到

点亮在社区的每一天
去签到