MQTT
,EMQX
认识,安装部署
一,认识
EMQX
,大规模分布式物联网MQTT
消息服务器,高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网平台与应用。是一款大规模可弹性伸缩的云原生分布式物联网MQTT
消息服务器。MQTT 协议,是基于发布/订阅模式的物联网通信协议,凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山
EMQ
:面向海量的 移动/物联网/车载 等终端接入,并实现在海量物理网设备间快速低延时的消息路由:
- 稳定承载大规模的
MQTT
客户端连接,单服务器节点支持百万连接。 - 分布式节点集群,快速低延时的消息路由,单集群支持千万规模的路由。
- 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
- 完整物联网协议支持,
MQTT
、MQTT-SN
、CoAP
、LwM2M
、私有TCP/UDP
协议支持。
MQTT
是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的,与 HTTP 的 请求(Request)/应答(Response) 的模式有本质的不同。
订阅者(Subscriber) 会向 消息服务器(Broker) 订阅一个 主题(Topic) 。成功订阅后,消息服务器会将该主题下的消息转发给所有的订阅者。
主题(Topic)以 ‘/’ 为分隔符区分不同的层级。包含通配符 ‘+’ 或 ‘#’ 的主题又称为 主题过滤器(Topic Filters) ; 不含通配符的称为 主题名(Topic Names) 例如:
sensor/1/temperature
chat/room/subject
presence/user/feng
sensor/1/#
sensor/+/temperature
uber/drivers/joe/inbox
看下图我们认识一下mqtt 的通信过程
连接
设备与服务端建立长链接后,会派生出一个connect
进程来维护设备与服务端的通信,客户端收到连接协议包,校验完权限后会建立session,session进程建立成功后会与连接进程建立双向绑定,之后连接进程给设备返回connack
协议包;
订阅,取消订阅
conn进程收到SUBSCRIBE协议包后,校验权限后,调用主题,session进程投递消息,session进程处理完订阅逻辑后,给conn进程发送suback
消息
断开连接
设备端发送DISCONNECT, conn进程执行正常stop,session进程监听到EXIT消息后自动退出。
心跳
心跳逻辑仅用的了conn进程,即conn收到PINGREQ立刻返回PINGRESP。
发布
协议控制包类型,详情见:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718019
消息重传 (Message Retransmission)
官网地址:(http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718019)
消息重传,是属于 MQTT
协议标准规范的一部分。协议中规定了作为通信的双方 服务端 和 客户端 对于自己发送到对端的 PUBLISH 消息都应满足其 服务质量 (Quality of Service levels) 的要求。如:
QoS 0:表示 最多一次交付;消息是根据底层网络的能力传递的。接收方不发送响应,发送方不执行重试。消息到达接收者一次或根本不到达。
QoS
1:表示 消息至少送达一次 (At least once delivery);即发送端会一直重发该消息,除非收到了对端对该消息的确认。意思是在 MQTT 协议的上层(即业务的应用层)相同的 QoS 1 消息可能会收到多次。
PUBLISH
#1 Sender ---------------> Receiver (*)
PUBACK
#2 Sender <--------------- Receiver
涉及到 2 个报文;共 2 次发送动作,发送端和接收端各 1 次;这 2 个报文都持有相同的 PacketId。
行尾标记为 * 号的,表示发送方在等待确认报文超时后,可能会主动发起重传。
- QoS 2:表示 消息只送达一次 (Exactly once delivery);即该消息在上层仅会接收到一次。
虽然,QoS 1 和 QoS 2 的 PUBLISH 报文在 MQTT 协议栈这一层都会发生重传,但请你谨记的是:
- QoS 1 消息发生重传后,在 MQTT 协议栈上层,也会收到这些重发的 PUBLISH 消息。
- QoS 2 消息无论如何重传,最终在 MQTT 协议栈上层,都只会收到一条 PUBLISH 消息
PUBLISH
#1 Sender ---------------> Receiver (*)
PUBREC
#2 Sender <--------------- Receiver
PUBREL
#3 Sender ---------------> Receiver (*)
PUBCOM
#4 Sender <--------------- Receiver
涉及到 4 个报文;共 4 次发送动作,发送端和接收端各 2 次;这 4 个报文都持有相同的 PacketId。
行尾标记为 * 号的,表示发送方在等待确认报文超时后,可能会主动发起重传
消息顺序
当然,以上的概念仅需要了解即可,你最需要关心的是,消息在被重复发送后,消息顺序出现的变化,尤其是 QoS 1 类的消息。例如:
假设,当前飞行窗口设置为 2 时,EMQX 计划向客户端的某主题投递 4 条 QoS 1 的消息。并假设客户端程序、或网络在中间出现过问题,那么整个发送流程会变成:
#1 [4,3,2,1 || ] -----> []
#2 [4,3 || 2, 1] -----> [1, 2]
#3 [4 || 3, 2] -----> [1, 2, 3]
#4 [4 || 3, 2] -----> [1, 2, 3, 2, 3]
#5 [ || 4] -----> [1, 2, 3, 2, 3, 4]
#6 [ || ] -----> [1, 2, 3, 2, 3, 4]
流程共 6 个步骤;左边表示 EMQX 的 消息队列 和 飞行窗口,以 ||
分割;右侧表示客户端收到的消息顺序,其中每步表示:
- Broker 将 4 条消息放入消息队列中。
- Broker 依次发送
1
2
,并将其放入 飞行窗口 中;客户端仅应答消息1
;且此时由于客户端发送流出现了问题,无法发送后续应答报文。 - Broker 收到消息
1
的应答;从飞行窗口移除消息1
;并将3
发送出去;继续等待2
3
的应答; - Broker 等待应答超时,重发了报文
2
3
;客户端收到重发的报文2
3
并正常应答。 - Broker 从飞行窗口移除了消息
2
3
,并发送报文4
;客户端收到了报文 4 并回复应答。 - 至此,所有报文处理完成。客户端收到的报文顺序为
[1, 2, 3, 2, 3, 4]
,并也依次上报给 MQTT 协议栈的上层。
虽然,存在重复的报文消息。但这是完全符合协议的规范的,每个报文第一次出现的位置都是有序的,并且重复收到的报文 2
3
的报文中,会携带一个标识位,表明其为重发报文。
MQTT 协议和 EMQX 将这个主题认为是 有序的主题 (Ordered Topic)
参见:MQTTv3.1.1 - Message ordering (opens new window)。
它确保 相同的主题和 QoS 下,消息是按顺序投递和应答的。
此外,如果用户期望所有主题下的 QoS 1 与 QoS 2 消息都严格有序,那么需要设置飞行窗口的最大长度为 1,但代价是会降低该客户端的吞吐。
详情请见:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718101
优点:
- 源码开放。
- 单节点支持500万
MQTT
设备连接 ,集群可扩展至一亿并发MQTT
连接; - 单节点支持每秒实时接收、移动、处理与分发数百万条的
MQTT
消息。 - 基于
OTP
软实时的运行时系统设计,消息分发与投递时延低于 1 毫秒 - 采用
Masterless
的大规模分布式集群架构,实现系统高可用和水平扩展。
当我们接触当新技术,要养成查看官网,以及相关文档的习惯。这样会极快的帮助我们高效的学习、掌握它。然后再根据搜索引擎进行查漏补缺。以下是
EMQX
相关连接
官网地址:https://www.emqx.io/zh
在线文档:https://www.emqx.io/docs/zh/v5.0/#%E4%BA%A7%E5%93%81%E4%BC%98%E5%8A%BF
下载地址:https://www.emqx.io/zh/downloads
二,安装
下载地址:https://www.emqx.io/zh/downloads
这里看自己需求,通过什么方式安装。
docker
,安装
获取docker镜像
docker pull emqx/emqx:5.0.7
启动 Docker 容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.7
这里需要绑定映射出来配置文件,参考一下目录,这里不在多做解释,不会docker 绑定物理地址,请看我的docker 相关博客,比较简单 只需 命令 -v;
linux
,安装
二进制安装
下载
wget https://www.emqx.com/zh/downloads/broker/5.0.7/emqx-5.0.7-el8-amd64.rpm
安装
sudo yum install emqx-5.0.7-el8-amd64.rpm
启动
sudo emqx start
相关目录
zip包安装
下载地址:https://github.com/emqx/emqx/releases/tag/v5.0.7
百度网盘地址:
为什么是选择下载这个?多看看文档介绍。
下载
wget https://www.emqx.com/zh/downloads/broker/5.0.7/emqx-5.0.7-el7-amd64.tar.gz
解压
tar -zxvf emqx-5.0.7-el7-amd64.tar.gz
查看目录
启动
./bin/emqx start ./bin/emqx_ctl status
停止
EMQX Broker
./bin/emqx stop
卸载
EMQX Broker
直接删除 EMQX 目录即可
Linux环境注意版本,和依赖即可
三,部署
这里以docker 部署为例。
拉取镜像
docker pull emqx/emqx:5.0.7
启动容器
docker run -d -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 --restart=always --name emqx [容器id]
top:对于命令不懂可以查看我之前docker系列博客
访问控制台:ip:18083
默认 用户名:admin
密码: public
首次进来是 英文, 在 setting 将 英文 改为简体中文即可
四,配置
编辑用户名密码配置文件
vim ./etc/emqx/plugins/emqx_auth_username.conf
关闭匿名登录
vim /etc/emqx/emqx.conf
查找allow_anonymous,修改为false
Top:小技巧 vi 下 可以 直接输入 /[字符] 回车查找, n 是下一个
五,测试,使用 mqtt.fx
界面介绍
配置服务器名称,
IP
地址,端口, 其他默认即可,如果服务器配置了不允许匿名,这里我们需要 User Credentials 菜单栏,配置账号密码
连接成功,先订阅主题 ,然后再发布
主题: 这里的主题,我们自定义即可,只要在稍后发布的时候能一致就行,然后点击subscribe
订阅即可
可以看到已经有了一个客户端,而且可以看到有个一个主题了。
发布消息
查看消息
五,总结
大多是官网资料总结,如果在之前有接触过 ActiveMQ 这一类的消息中间件,理解起来相对简单一点