介绍
MQTT(消息队列遥测传输)是ISO标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。
特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
对负载内容屏蔽的消息传输;
使用TCP/IP提供网络连接;
有三种消息发布服务质量;
- 至多一次:消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- 至少一次:确保消息到达,但消息重复可能会发生。
- 只有一次:确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
1.安装RabbitMQ
基于Docker来安装RabbitMQ,使用下面的命令即可:
docker run \
-e RABBITMQ_DEFAULT_USER=mq \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-p 1883:1883 \
-p 15675:15675 \
-d \
rabbitmq:3.8-management
命令解释:
-e RABBITMQ_DEFAULT_USER=itheima
: 设置 RabbitMQ 的默认用户名为 syd。-e RABBITMQ_DEFAULT_PASS=123321
: 设置 RabbitMQ 的默认密码为 123456。-v mq-plugins:/plugins
: 将主机上的mq-plugins
挂载到容器内的/plugins
目录,以便于使用自定义插件。--name mq
: 指定容器的名称为mq
。--hostname mq
: 设置容器的主机名为mq
。-p 15672:15672
: 映射容器的 15672 端口(管理界面)到主机的 15672 端口。-p 5672:5672
: 映射容器的 5672 端口(AMQP 协议)到主机的 5672 端口。-d
: 以分离模式运行容器。
可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
- 1883: mqtt协议端口号
安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。
登录后即可看到管理控制台总览页面:
2.使用mqtt协议
命令
docker exec mq rabbitmq-plugins enable rabbitmq_management
docker exec mq rabbitmq-plugins enable rabbitmq_mqtt
docker exec mq rabbitmq-plugins enable rabbitmq_web_mqtt
我们可以看到,存在了mqtt协议
3.安装mqtt连接工具MQTTX
新建连接:
新建订阅
输入订阅名:
发消息测试:
4.springBoot集成mqtt
GitHub - tocrhz/mqtt-spring-boot-starter: MQTT starter for Spring Boot, easier to use.
4.1安装依赖
<dependency>
<groupId>com.github.tocrhz</groupId>
<artifactId>mqtt-spring-boot-starter</artifactId>
<version>1.2.8.1</version>
</dependency>
4.2 配置
mqtt:
uri: tcp://192.168.152.130:1883
username: 'syd'
password: '123456'
client-id: ${random.uuid}
disable: false
4.3 使用
订阅:
@Component
@Slf4j
public class MqttMessageHandler {
@MqttSubscribe("test/send")
public void sub(String topic, MqttMessage message, @Payload String payload) {
log.info("receive from : {}", topic);
log.info("message payload : {}", new String(message.getPayload(), StandardCharsets.UTF_8));
log.info("string payload : {}", payload);
}
}
发送:
@RestController
public class Send {
private final MqttPublisher publisher;
public Send(MqttPublisher publisher) {
this.publisher = publisher;
}
@GetMapping("/send")
public void send(){
publisher.send("test/send", "你好!");
}
}
客户端订阅
当然也可以根据需要发送响应的数据格式;
一些其他配置,根据需要选择