使用版本
"mqtt": "^5.8.0",
安装指令
npm install mqtt --save
------
yarn add mqtt
介绍mqtt
配置
connection: {
protocol: "ws",
host: "broker.emqx.io",
port: 8083,
endpoint: "/mqtt",
clean: true,
connectTimeout: 30 * 1000, // ms
reconnectPeriod: 4000, // ms
clientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),
// 随机数 每次不能重复
username: "emqx_test",
password: "emqx_test",
},
连接
import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)
client.on('connect', (e) => {
// 订阅主题
})
订阅主题
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
消息发布
给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。
client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {
if (!err) {
console.log('发送成功')
client.subscribe(topic, { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
} else {
console.log('消息订阅失败!')
}
})
} else {
console.log('消息发送失败!')
}
})
取消订阅
client.unsubscribe(topicList, (error) => {
console.log('主题为' + topicList + '取消订阅成功', error)
})
断开连接
export function unconnect() {
client.end()
client = null
// Message.warning('服务器已断开连接!')
console.log('服务器已断开连接!')
}
mqtt封装使用(ts版)
import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';
interface ClientOptions extends IClientOptions {
clientId: string;
}
interface SubscribeOptions {
topic: string;
callback: (topic: string, message: string) => void;
subscribeOption?: mqtt.IClientSubscribeOptions;
}
interface PublishOptions {
topic: string;
message: string;
}
class Mqtt {
private static instance: Mqtt;
private client: MqttClient | undefined;
private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};
private pendingSubscriptions: SubscribeOptions[] = [];
private pendingPublications: PublishOptions[] = [];
private isConnected: boolean = false;
private constructor(url?: string) {
if (url) {
this.connect(url);
}
}
public static getInstance(url?: string): Mqtt {
if (!Mqtt.instance) {
Mqtt.instance = new Mqtt(url);
} else if (url && !Mqtt.instance.client) {
Mqtt.instance.connect(url);
}
return Mqtt.instance;
}
private connect(url: string): void {
console.log(url, clientOptions);
if (!this.client) {
this.client = mqtt.connect(url, clientOptions);
this.client.on('connect', this.onConnect);
this.client.on('reconnect', this.onReconnect);
this.client.on('error', this.onError);
this.client.on('message', this.onMessage);
}
}
public disconnect(): void {
if (this.client) {
this.client.end();
this.client = undefined;
this.subscribeMembers = {};
this.isConnected = false;
console.log(`服务器已断开连接!`);
}
}
public subscribe({ topic, callback }: SubscribeOptions): void {
if (this.isConnected) {
this.client?.subscribe(topic, { qos: 1 }, error => {
if (error) {
console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);
} else {
console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);
}
});
this.subscribeMembers[topic] = callback;
} else {
this.pendingSubscriptions.push({ topic, callback });
}
}
public unsubscribe(topic: string): void {
if (!this.client) {
return;
}
this.client.unsubscribe(topic, error => {
if (error) {
console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);
} else {
console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);
}
});
this.subscribeMembers[topic] = undefined;
}
public publish({ topic, message }: PublishOptions): void {
if (this.isConnected) {
this.client?.publish(topic, message, { qos: 1 }, e => {
if (e) {
console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);
}
});
} else {
this.pendingPublications.push({ topic, message });
}
}
private onConnect = (e: any): void => {
console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);
this.isConnected = true;
this.processPendingSubscriptions();
this.processPendingPublications();
};
private onReconnect = (): void => {
console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);
this.isConnected = false;
};
private onError = (error: Error): void => {
console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);
this.isConnected = false;
};
private onMessage = (topic: string, message: Buffer): void => {
// console.log(
// `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,
// message.toString(),
// );
const callback = this.subscribeMembers?.[topic];
callback?.(topic, message.toString());
};
private processPendingSubscriptions(): void {
while (this.pendingSubscriptions.length > 0) {
const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
this.subscribe({ topic, callback, subscribeOption });
}
}
private processPendingPublications(): void {
while (this.pendingPublications.length > 0) {
const { topic, message } = this.pendingPublications.shift()!;
this.publish({ topic, message });
}
}
}
const clientOptions: ClientOptions = {
clean: true,
connectTimeout: 500,
protocolVersion: 5,
rejectUnauthorized: false,
username: 'admin',
password: 'Anjian-emqx',
clientId: `client-${Date.now()}`
};
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);
注意:
- 环境配置
.env.test
VITE_OTHER_SERVICE_BASE_URL= `{
"mqtt": "ws://192.168.11.14:8083/mqtt"
}`
qos
设置 前后端统一为1