我们平台设计了多协议兼容的通信体系,比如:NB、MQTT、HTTP、TCP、DB,该篇文章主要讲解以下三种常见的接入方式。
一、核心通信方案:MQTT协议(80%设备接入)
1. 协议选型依据
- 低功耗需求:水务场景中40%设备为电池供电的LoRa传感器
- 弱网适应性:管网监测点多位于地下,网络波动频繁
- 标准支持:遵循MQTT 3.1.1协议规范
2. 关键实现细节
sequenceDiagram
participant Device as 水务设备
participant Broker as EMQX集群
participant Service as 业务服务
Device->>Broker: CONNECT(ClientID=设备ID)
Broker-->>Device: CONNACK(含平台时间同步)
loop 心跳检测
Device->>Broker: PINGREQ(每5分钟)
Broker-->>Device: PINGRESP
end
Device->>Broker: PUBLISH(主题:sys/${productKey}/data)
Broker->>Service: 通过RocketMQ转发消息
Service->>Broker: PUBLISH(主题:sys/${productKey}/control)
3. 优化措施
- 主题设计:采用分层主题结构(sys/{productKey}/productKey/{deviceID}/up)
- QoS策略:常规数据采集使用QoS1,告警消息使用QoS2
- 报文压缩:对JSON数据采用GZIP压缩(压缩率约65%)
- 安全机制:
- 设备级TLS双向认证
- 动态Token更新(每日通过HTTPS获取新token)
二、补充通信方案:TCP长连接(关键设备接入)
1. 应用场景
- 泵站PLC控制器(需实时响应)
- 视频监测设备(高带宽需求)
2. 技术实现
- Netty框架:自定义协议解码器
// 协议帧结构:魔数(2B) + 版本(1B) + 命令字(1B) + 数据长度(4B) + 数据体
public class WaterProtocolDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 8) return;
in.markReaderIndex();
short magic = in.readShort();
if (magic != 0x4853) { // HS魔数
in.resetReaderIndex();
return;
}
byte version = in.readByte();
byte cmd = in.readByte();
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[length];
in.readBytes(data);
out.add(new ProtocolFrame(version, cmd, data));
}
}
3. 性能保障
- 连接管理:基于Redis的设备连接状态表
- Key:device:${deviceId}
- Value:last_heartbeat_time + channel_id
- 心跳机制:60秒超时断开
- 数据包处理:采用LengthFieldBasedFrameDecoder防粘包
三、旧/外平台 设备/通信方案
HTTP轮询(老旧设备兼容):
- 设备每5分钟上报数据
- 平台返回204 No Content减少数据传输量
WebSocket(调试控制台):
- 用于运维人员实时查看设备状态
- 消息格式与MQTT主题保持兼容
四、通信层关键问题解决
问题场景:大规模设备同时上线导致Broker过载
解决方案:
- 分级连接:将设备按区域划分到不同EMQX集群
- 预热策略:通过设备OTA分批次推送固件更新时间
- 负载监控:Prometheus+Granfana实时监控连接数
- 预警阈值:单节点10万连接
- 连接风暴保护:Netty的ChannelTrafficShapingHandler
五、通信架构整体视图
graph TB
subgraph 通信层
A[MQTT Broker集群] -->|消息路由| B[RocketMQ]
C[TCP接入服务] --> B
D[HTTP适配服务] --> B
end
subgraph 业务层
B --> E[设备状态服务]
B --> F[告警分析服务]
B --> G[数据持久化服务]
end
E --> H[Redis状态缓存]
G --> I[时序数据库]
G --> J[业务MySQL]