MQTT封装
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅(Pub/Sub)模式的轻量级消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)设备通信。。
项目中导入mqtt
使用yarn安装
yarn add mqtt
使用npm安装
npm install mqtt
封装MQTT操作类
为了保持MQTT在全局的唯一性,我们需要封装一个MQTT的单例操作类来统一管理MQTT相关的操作(连接、断开、订阅、重连等)
新建 MQTTService.js
import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo, logWarn} from "@utils/LogUtils";
let _instance = null // 单例实例
// 初始化 MQTT 工具的状态
function initMqtt() {
if (_instance) {
return _instance // 如果实例已存在,直接返回
}
let client = null
let connectStatus = 'Connect'
let isSubed = false // 订阅状态
let initHost = "ws://xxx.xxx.xxx.xxx:xxxx/mqtt"
let initOptions = {
clientId: '',
username: '',
password: ''
}
_instance = { // 将实例赋值给 _instance
getConnectStatus: () => connectStatus,
getIsSubscribedStatus: () => isSubed, // 获取订阅状态
//全局MQTT连接
handleConnect: function () {
if (connectStatus !== 'Connect') {
logInfo("MQTT已连接")
return
}
_instance.handleCustomConnect(initHost, initOptions)
},
handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {
connectStatus = 'Connecting'
client = mqtt.connect(host, mqttOptions)
logInfo('------> [ Connecting ] ------>', host, mqttOptions)
if (client) {
client.on('connect', () => {
connectStatus = 'Connected'
logInfo('------> [ Connected ]')
if (autoSub) {
//默认订阅主题
_instance.handleSubscribe(
[
'MqttTopic1',
'MqttTopic2',
'MqttTopic3',
],
0
)
}
if (onConnect) onConnect()
});
client.on('error', (err) => {
logError('------> [ Connection error ]', err)
if (onError) onError(err)
client.end()
});
client.on('reconnect', () => {
connectStatus = 'Reconnecting'
logError('------> [ Reconnecting ]')
if (onReconnect) onReconnect()
});
client.on('message', (topic, message) => {
logInfo('------> [ topic ] ------>', topic)
logInfo('------> [ message ] ---->', message.toString())
const payload = {topic, message: message.toString()}
//处理mqtt消息
dealMqttMsg(topic, message.toString())
if (onMessage) onMessage(payload)
});
}
},
handleSubscribe: (topic, qos, callback) => {
if (!client) {
throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');
}
client.subscribe(topic, {qos}, (err) => {
if (err) {
logError('Subscribe failed:', err)
return
}
isSubed = true// 更新订阅状态为已订阅
logInfo('------> [ 订阅主题成功 ]')
if (callback) callback()
});
},
handleUnsub: (topic, qos, callback) => {
if (client) {
client.unsubscribe(topic, {qos}, (error) => {
if (error) {
logError('Unsubscribe error', error)
return
}
isSubed = false // 更新订阅状态为未订阅
if (callback) callback()
})
}
},
handlePublish: (pubRecord) => {
if (!client) {
logError('MQTT 客户端未初始化,无法发布消息')
return
}
const {topic, qos, payload} = pubRecord
client.publish(topic, payload, {qos}, (error) => {
if (error) {
logError('------> [ 发布消息失败 ] ------>', error)
throw new Error(`发布消息失败: ${error.message || '未知错误'}`)
}
})
},
handleDisconnect: (callback) => {
if (client) {
try {
// 移除所有事件监听器
client.removeAllListeners()
//
client.end(false, () => {
connectStatus = 'Connect' // 确保断开连接后状态正确更新
client = null
logInfo('------> [ disconnect ]')
if (callback) callback()
})
} catch (error) {
connectStatus = 'Connect' // 确保断开连接后状态正确更新
logInfo('------> [ disconnect error ] ------>', error)
}
}
},
}
return _instance
}
export default initMqtt;
消息分发
工具类的封装
全局MQTT连接到服务器之后,会通过订阅主题来接收推送的消息,推送的消息是 topic(主题)+ message(消息内容)组成。我们需要根据不同的topic主题来进行消息的分发,所以需要封装一个消息分发的工具类。
新建 EventEmitter.js
import {pull} from 'lodash';
/**
* 自定义事件触发器类,用于管理事件监听和触发
*/
class EventEmitter {
/**
* 构造函数,初始化事件存储对象
*/
constructor() {
this._events = {}; // 存储事件名与回调函数列表的映射
}
/**
* 获取指定事件的回调函数列表,若不存在则创建空数组
* @param {string} event - 事件名称
* @returns {Function[]} 当前事件的回调函数列表
*/
_getFns(event) {
return this._events[event] || (this._events[event] = []);
}
/**
* 监听指定事件,添加回调函数
* @param {string} event - 事件名称
* @param {Function} cb - 回调函数
*/
on(event, cb) {
const fns = this._getFns(event);
fns.push(cb); // 将回调加入该事件的队列中
}
/**
* 移除指定事件的某个回调函数或整个事件的所有回调
* @param {string} event - 事件名称
* @param {Function} [cb] - 要移除的回调函数,不传则清空整个事件
*/
off(event, cb) {
if (cb) {
const fns = this._getFns(event);
pull(fns, cb); // 使用 lodash 的 pull 方法从数组中移除指定回调
} else {
delete this._events[event]; // 不传回调时删除整个事件键
}
}
/**
* 绑定只执行一次的事件监听器
* @param {string} event - 事件名称
* @param {Function} cb - 回调函数
*/
once(event, cb) {
const fn2 = (e) => {
this.off(event, fn2); // 执行后立即解绑自身
cb(e); // 执行原始回调
};
this.on(event, fn2); // 绑定包装后的回调
}
/**
* 同步触发指定事件,依次执行所有回调
* @param {string} event - 事件名称
* @param {*} [param] - 传递给回调函数的参数
*/
emit(event, param) {
const fns = this._getFns(event);
for (let i = 0; i < fns.length; i++) {
const fn = fns[i];
fn(param); // 同步执行每个回调函数
}
}
/**
* 异步触发指定事件,返回一个Promise
* 注意:此方法仅触发第一个回调并返回其 Promise 结果
* @param {string} event - 事件名称
* @param {*} [param] - 传递给回调函数的参数
* @returns {Promise} 返回第一个回调执行后的 Promise
*/
invoke(event, param) {
const fns = this._getFns(event);
for (let i = 0; i < fns.length; i++) {
const fn = fns[i];
return new Promise((resolve, reject) => {
resolve(fn(param)); // 将第一个回调的结果封装为 Promise
});
}
return Promise.reject(); // 如果没有回调,则返回拒绝状态
}
}
export default EventEmitter;
消息分发处理
新建 Events.js
import EventEmitter from './EventEmitter.js';
import {useEffect} from 'react';
const events = new EventEmitter();
export default events;
/**
* 订阅事件总线
*
* @param event - 事件名称
* @param callback - 回调函数
*
* @example
* useEventBus('自定义事件名', (pushInfo) => {
* console.log(pushInfo);
* });
*/
export function useEventBus(event, callback) {
useEffect(() => {
events.on(event, callback);
return () => {
events.off(event, callback);
};
});
}
/**
* mqtt消息处理
* @param topic
* @param message
*/
export function dealMqttMsg(topic, message) {
switch (topic) {
//网关配置相关订阅
case 'MqttTopic1':
//发送消息
events.emit('MqttTopic1', message)
break;
case 'MqttTopic2':
//发送消息
events.emit('MqttTopic2', message)
break;
case 'MqttTopic3':
//发送消息
events.emit('MqttTopic3', message)
break;
}
}
消息订阅
我们可以在页面上通过订阅的方式来进行消息的接收。
/**
* MQTT消息订阅
*/
useEventBus('MqttTopic1', (message) => {
console.log(message)
})
MQTT使用
初始化
如果需要在全局进行MQTT连接,则可以在App.js里面进行mqtt的连接和断开,或者在登陆、登出进行同样可以。
useEffect(() => {
if (initMqtt().getConnectStatus() === "Connect") {
//连接MQTT
initMqtt().handleConnect()
}
return () => {
//组件销毁时关闭连接
initMqtt().handleDisconnect()
}
}, [])
心跳
新建 HeartBeatUtils.js
let heartTimerId = null;
export const startHeartBeat = (intervalTime, intervalMethod) => {
if (!intervalTime || !intervalMethod) {
console.warn('Invalid parameters for startHeartBeat');
return null;
}
// 清除已有的定时器
clearHeartBeat();
// 设置新的定时器
heartTimerId = setInterval(async () => {
intervalMethod()
}, intervalTime);
//
intervalMethod()
};
export const clearHeartBeat = () => {
if (heartTimerId) {
clearTimeout(heartTimerId);
heartTimerId = null;
}
};
可以在连接MQTT的回调中使用,MQTT连接成功后开启心跳定时器,发送心跳消息,在MQTT断开连接时,手动停止心跳定时器即可。
handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {
connectStatus = 'Connecting'
client = mqtt.connect(host, mqttOptions)
logInfo('------> [ Connecting ] ------>', host, mqttOptions)
if (client) {
client.on('connect', () => {
connectStatus = 'Connected'
logInfo('------> [ Connected ]')
// 启动心跳定时器
startHeartBeat(1000 * 60, () => {
//实现具体的心跳方法(发送指定的心跳数据)
_instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})
})
//是否自动订阅主题
if (autoSub) {
//默认订阅主题
_instance.handleSubscribe(
[
'MqttTopic1',
'MqttTopic2',
'MqttTopic3',
],
0
)
}
if (onConnect) onConnect()
});
client.on('error', (err) => {
logError('------> [ Connection error ]', err)
if (onError) onError(err)
client.end()
});
client.on('reconnect', () => {
connectStatus = 'Reconnecting'
logError('------> [ Reconnecting ]')
if (onReconnect) onReconnect()
});
client.on('message', (topic, message) => {
logInfo('------> [ topic ] ------>', topic)
logInfo('------> [ message ] ---->', message.toString())
const payload = {topic, message: message.toString()}
//处理mqtt消息
dealMqttMsg(topic, message.toString())
if (onMessage) onMessage(payload)
});
}
}
handleDisconnect: (callback) => {
if (client) {
try {
// 移除所有事件监听器
client.removeAllListeners()
//
client.end(false, () => {
connectStatus = 'Connect' // 确保断开连接后状态正确更新
client = null
//移除心跳定时器
clearHeartBeat()
logInfo('------> [ disconnect ]')
if (callback) callback()
})
} catch (error) {
connectStatus = 'Connect' // 确保断开连接后状态正确更新
logInfo('------> [ disconnect error ] ------>', error)
}
}
}
完整的MQTT相关操作类如下所示:
import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo} from "@utils/LogUtils";
import {startHeartBeat} from "./util/HeartBeatUtils";
let _instance = null // 单例实例
// 初始化 MQTT 工具的状态
function initMqtt() {
if (_instance) {
return _instance // 如果实例已存在,直接返回
}
let client = null
let connectStatus = 'Connect'
let isSubed = false // 订阅状态
let initHost = "ws://XXX.XXX.X.X:XXXX/aiot/mqtt"
let initOptions = {
clientId: '',
username: '',
password: ''
}
_instance = {
getConnectStatus: () => connectStatus,
getIsSubscribedStatus: () => isSubed, // 获取订阅状态
//全局MQTT连接
handleConnect: function () {
if (connectStatus !== 'Connect') {
logInfo("MQTT已连接")
return
}
_instance.handleCustomConnect(initHost, initOptions)
},
handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {
connectStatus = 'Connecting'
client = mqtt.connect(host, mqttOptions)
logInfo('------> [ Connecting ] ------>', host, mqttOptions)
if (client) {
client.on('connect', () => {
connectStatus = 'Connected'
// 启动心跳定时器
startHeartBeat(1000 * 60, () => {
_instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})
})
logInfo('------> [ Connected ]')
if (autoSub) {
//默认订阅主题
_instance.handleSubscribe(
[
'MqttTopic1',
'MqttTopic2',
'MqttTopic3',
],
0
)
}
if (onConnect) onConnect()
});
client.on('error', (err) => {
logError('------> [ Connection error ]', err)
if (onError) onError(err)
client.end()
});
client.on('reconnect', () => {
connectStatus = 'Reconnecting'
logError('------> [ Reconnecting ]')
if (onReconnect) onReconnect()
});
client.on('message', (topic, message) => {
logInfo('------> [ topic ] ------>', topic)
logInfo('------> [ message ] ---->', message.toString())
const payload = {topic, message: message.toString()}
//处理mqtt消息
dealMqttMsg(topic, message.toString())
if (onMessage) onMessage(payload)
});
}
},
handleSubscribe: (topic, qos, callback) => {
if (!client) {
throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');
}
client.subscribe(topic, {qos}, (err) => {
if (err) {
logError('Subscribe failed:', err)
return
}
isSubed = true// 更新订阅状态为已订阅
logInfo('------> [ 订阅主题成功 ]')
if (callback) callback()
});
},
handleUnsub: (topic, qos, callback) => {
if (client) {
client.unsubscribe(topic, {qos}, (error) => {
if (error) {
logError('Unsubscribe error', error)
return
}
isSubed = false // 更新订阅状态为未订阅
if (callback) callback()
})
}
},
handlePublish: (pubRecord) => {
if (!client) {
logError('MQTT 客户端未初始化,无法发布消息')
return
}
const {topic, qos, payload} = pubRecord
client.publish(topic, payload, {qos}, (error) => {
if (error) {
logError('------> [ 发布消息失败 ] ------>', error)
throw new Error(`发布消息失败: ${error.message || '未知错误'}`)
}
})
},
handleDisconnect: (callback) => {
if (client) {
try {
// 移除所有事件监听器
client.removeAllListeners()
//
client.end(false, () => {
connectStatus = 'Connect' // 确保断开连接后状态正确更新
client = null
logInfo('------> [ disconnect ]')
if (callback) callback()
})
} catch (error) {
connectStatus = 'Connect' // 确保断开连接后状态正确更新
logInfo('------> [ disconnect error ] ------>', error)
}
}
},
}
return _instance
}
export default initMqtt;