一、问题描述
我们公司是物联网科技公司,现在需要对一批下位机的监测数据进行收集分析。如何与下位机进行通讯并获取下位机上传的监测数据呢?
二、解决方案
现通过SpringBoot + EMQX + MQTT通讯协议,建立与下位机的通讯,实现对下位机上传的监测数据进行接收。
三、解决步骤
1. 安装 EMQX 消息中间件
1.1 下载 EMQX 安装包(这里使用的Windows 版本进行测试)
直接去管网进行下载(EMQX 下载),EMQX从5.4.0版本起,不再支持windows系统,所以我们选5.3.2的windows版本进行下载(Windows 版本在最下面)。
1.2 安装 EMQX
解压下载的安装包,注意,路径不要带中文,得到如下目录
1.3 启动 EMQX
在bin目录下启动控制台
在控制台里输入命令,启动 EMQX
// 启动 emqx
emqx start
// 停止 emqx
emqx stop
如图,则启动成功
1.4 登录 EMQX
访问 Web页面地址,默认账号密码:admin/public
http://localhost:18083
登录成功后
2. SpringBoot 整合 EMQX
2.1 在配置文件中配置 EMQX 的相关信息
--- #################### emqx 相关配置 ####################
iot:
emq:
# MQTT 登录账号(与 EMQX 中设置的一致)
username: user001
# MQTT 登录密码
password: tc123456
# MQTT 服务端连接地址(支持 tcp、ssl、ws 等协议)
# 示例:tcp://localhost:1883 或 ssl://host:8883
hostUrl: tcp://127.0.0.1:1883
# 客户端唯一 ID,多个客户端不能重复
# 可使用 Spring 表达式随机生成,如:${random.int}
clientId: ${random.int}
# 连接超时时间(单位:秒),默认建议 10 秒
timeout: 10
# 心跳包发送间隔(单位:秒),默认 60 秒
keepalive: 60
# 是否清除会话(false 表示保留订阅信息,接收离线期间的消息)
# true 表示每次连接都是全新会话,断开后无法接收历史消息
clearSession: false
# 是否启用共享订阅(用于多个客户端负载均衡接收同一个主题的消息)
# 开启后将自动使用 `$share/组名/主题` 格式
sharedGroup: false
# 默认订阅的主题列表(可配置多个,这里订阅所有的主题)
defaultTopics:
- /+
2.2 代码相关部分
代码结构
编写 MQTT 客户端常用的回调接口 MqttCallbackExtended,用于处理 连接状态变化、消息到达、消息发送完成等事件。
/**
* Description: 用于处理MQTT连接的回调,如连接断开、消息到达、消息发布完成、连接完成等事件。
*
* @author JavaGuru_LiuYu
* Date 2025-06-24 16:12:26
*/
@Slf4j
@Component
public class EmqxCallback implements MqttCallbackExtended {
@Resource
private EmqxService emqxService;
@Resource
private EmqxClient emqxClient;
/**
* 当连接丢失时触发,例如网络异常或服务端断开连接
* 会自动调用 EmqxClient 的重连机制(带指数退避)
*/
@Override
public void connectionLost(Throwable cause) {
log.warn("MQTT 连接断开,准备 2 秒后重连...", cause);
emqxClient.scheduleReconnectWithBackoff();
}
/**
* 当订阅的主题收到消息时触发
*
* @param topic 消息所属的主题
* @param mqttMessage 收到的消息内容
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
emqxService.subscribeCallback(topic, mqttMessage);
}
/**
* 当消息成功发送完成时触发(QoS 1/2 的 ACK 回执)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("消息发送完成: {}", token.getMessageId());
}
/**
* 当连接成功建立时触发(包括初始连接或重连)
*
* @param reconnect 是否是重连
* @param serverUri 连接的服务器地址
*/
@Override
public void connectComplete(boolean reconnect, String serverUri) {
log.info("MQTT 已连接到服务器: {}(是否为重连:{})", serverUri, reconnect);
// 自动重新订阅所有主题
emqxService.subscribe(emqxClient.getMqttClient());
}
}
MQTT 客户端管理器,封装连接到 EMQX 服务的所有操作(包含重连机制)。
/**
* Description: MQTT客户端类,负责建立与MQTT服务器的连接,提供发布消息和订阅主题的功能
*
* @author JavaGuru_LiuYu
* Date 2025-06-24 16:17:28
*/
@Slf4j
@Data
@Component
public class EmqxClient {
@Resource
private EmqxCallback emqxCallback;
@Resource
private MqttConfig mqttConfig;
// MQTT 客户端实例
private MqttClient mqttClient;
// 用于调度重连的线程池(单线程)
private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
// 防止重复重连的标志
private volatile boolean reconnecting = false;
/**
* 初始化或主动连接 MQTT
*/
public synchronized void connect() {
try {
// 如果还未创建 mqttClient,则先构造
if (mqttClient == null) {
createMqttClient();
}
// 如果未连接,则尝试连接
if (!mqttClient.isConnected()) {
mqttClient.connect(createMqttOptions());
log.info("MQTT 客户端连接成功 ✅");
}
} catch (MqttException e) {
log.error("MQTT 初始连接失败 ❌,准备进入重连流程", e);
// 启动自动重连机制
scheduleReconnectWithBackoff();
}
}
/**
* 异步重连任务(带指数退避)
*/
public void scheduleReconnectWithBackoff() {
if (reconnecting) {
return; // 已经有重连任务在执行,避免重复
}
reconnecting = true;
reconnectScheduler.execute(() -> {
int retry = 0;
long delay = 2000; // 初始延迟:2秒
while (!isConnected()) {
try {
retry++;
log.warn("MQTT 断线重连中,第 {} 次,等待 {} ms", retry, delay);
Thread.sleep(delay);
connect(); // 尝试连接(内部已做判断)
if (isConnected()) {
log.info("MQTT 重连成功 ✅");
reconnecting = false;
return;
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("MQTT 重连异常 ❌", e);
}
// 延迟时间指数增长,最大不超过 60 秒
delay = Math.min(delay * 2, 60000);
}
reconnecting = false;
});
}
/**
* 创建 MQTT 客户端
*/
private void createMqttClient() throws MqttException {
String clientId = "TC-" + mqttConfig.getClientId() + "-" + UUID.randomUUID();
mqttClient = new MqttClient(mqttConfig.getHostUrl(), clientId, new MemoryPersistence());
mqttClient.setCallback(emqxCallback);
}
/**
* 构造连接参数(用户名、密码等)
*/
private MqttConnectOptions createMqttOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
options.setConnectionTimeout(mqttConfig.getTimeout());
options.setKeepAliveInterval(mqttConfig.getKeepalive());
options.setCleanSession(mqttConfig.isClearSession());
return options;
}
/**
* 判断客户端是否连接成功
*/
public boolean isConnected() {
return mqttClient != null && mqttClient.isConnected();
}
/**
* 发布消息,未连接时自动尝试重连
*/
public void publish(String topic, String message) {
try {
if (!isConnected()) {
log.warn("MQTT 未连接,准备重连后再发布");
scheduleReconnectWithBackoff();
return;
}
mqttClient.publish(topic, new MqttMessage(message.getBytes(StandardCharsets.UTF_8)));
log.info("消息发布成功 -> 主题: {}", topic);
} catch (MqttException e) {
log.error("发布消息失败 -> 主题: {}", topic, e);
}
}
/**
* 订阅主题,未连接时自动尝试重连
*/
public void subscribe(String topic) {
try {
if (!isConnected()) {
log.warn("MQTT 未连接,准备重连后再订阅");
scheduleReconnectWithBackoff();
return;
}
mqttClient.subscribe(topic, 1);
log.info("订阅主题成功: {}", topic);
} catch (MqttException e) {
log.error("订阅主题失败: {}", topic, e);
}
}
/**
* Bean 销毁前关闭连接并释放线程池
*/
@PreDestroy
public void shutdown() {
if (isConnected()) {
try {
mqttClient.disconnect();
mqttClient.close();
log.info("MQTT 客户端已安全关闭");
} catch (MqttException e) {
log.error("关闭 MQTT 客户端失败", e);
}
}
reconnectScheduler.shutdownNow();
}
}
MQTT 配置类:用于绑定 application.yml 中的连接信息。
/**
* Description: MQTT 配置类:用于绑定 application.yml 中的连接信息
*
* @author JavaGuru_LiuYu
* Date 2025-06-24 16:22:16
*/
@Data
@Component
@ConfigurationProperties(prefix = "iot.emq")
public class MqttConfig {
/**
* MQTT 用户名
*/
private String username;
/**
* MQTT 密码
*/
private String password;
/**
* MQTT 服务端地址
*/
private String hostUrl;
/**
* 客户端唯一 ID
*/
private String clientId;
/**
* 连接超时时间(单位:秒)
*/
private int timeout;
/**
* 心跳间隔时间(单位:秒)
*/
private int keepalive;
/**
* 是否清除会话
*/
private boolean clearSession;
/**
* 是否启用共享订阅
*/
private boolean sharedGroup;
/**
* 默认订阅的主题列表
*/
private List<String> defaultTopics = new ArrayList<>();
}
MQTT 模块中的核心业务逻辑实现部分。
/**
* Description: 用于处理MQTT消息的具体业务逻辑,如订阅回调
*
* @author JavaGuru_LiuYu
* Date 2025-06-18 14:10:05
*/
public interface EmqxService {
/**
* 订阅回调
*
* @param topic 主题
* @param mqttMessage 消息
*/
void subscribeCallback(String topic, MqttMessage mqttMessage);
/**
* 订阅主题
*
* @param client MQTT 客户端
*/
void subscribe(MqttClient client);
}
/**
* Description: 用于处理MQTT消息的具体业务逻辑,如订阅回调
*
* @author JavaGuru_LiuYu
* Date 2025-06-18 14:10:18
*/
@Slf4j
@Service
public class EmqxServiceImpl implements EmqxService {
/**
* 策略映射池,key 为策略类名,value 为对应处理器
*/
private final Map<String, MqttTopicHandle> handleMap = new ConcurrentHashMap<>();
private final ApplicationContext applicationContext;
@Resource
private MqttConfig mqttConfig;
public EmqxServiceImpl(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
// 初始化映射表
@PostConstruct
public void init() {
handleMap.put("/flow_meter", applicationContext.getBean(FlowMeterMqttTopicHandleImpl.class));
handleMap.put("/liquid_meter", applicationContext.getBean(LiquidMeterMqttTopicHandleImpl.class));
}
/**
* 当收到 MQTT 消息时触发
*
* @param topic 消息主题
* @param mqttMessage MQTT 消息体
*/
@Override
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
log.info("收到消息 -> 主题: {}, 内容: {}", topic, payload);
// 选择合适的策略处理数据
handleMap.getOrDefault(topic, new DefaultMqttTopicHandleImpl())
.handle(topic,payload);
}
/**
* 在 MQTT 连接建立完成后,默认订阅 mqttConfig 配置中的主题
*
* @param client MQTT 客户端实例
*/
@Override
public void subscribe(MqttClient client) {
try {
for (String topic : mqttConfig.getDefaultTopics()) {
client.subscribe(topic, 1);
log.info("默认订阅主题成功: {}", topic);
}
} catch (Exception e) {
log.error("默认订阅主题失败", e);
}
}
}
自动初始化 MQTT 连接,当 Spring Boot 项目启动完成时,自动连接到 EMQX 服务器(MQTT Broker)。
/**
* Description: 用于在应用启动时自动连接MQTT服务器
*
* @author JavaGuru_LiuYu
* Date 2025-06-24 16:29:23
*/
@Component
public class EmqxStart implements ApplicationRunner {
@Resource
private EmqxClient emqxClient;
@Override
public void run(ApplicationArguments args) {
emqxClient.connect();
}
}
在上述的核心业务逻辑实现部分中,我们使用策略模式对代码进行优化,对不同主题推送的数据选择不同的处理策略。
/**
* Description: 主题策略
*
* @author JavaGuru_LiuYu
* Date 2025-06-18 13:47:21
*/
public interface MqttTopicHandle {
/**
* 处理策略
*
* @param topic 主题
* @param payload 载荷
*/
void handle(String topic, String payload);
}
/**
* Description: 液位计MQTT 主题处理策略
*
* @author JavaGuru_LiuYu
* Date 2025-06-18 16:56:09
*/
@Slf4j
@Component
public class LiquidMeterMqttTopicHandleImpl implements MqttTopicHandle {
@Override
public void handle(String topic, String payload) {
log.info("液位计处理策略处理 topic:{}, payload:{}", topic, payload);
// 下面编写对应的处理操作
}
}
/**
* Description: 流量计MQTT 主题处理策略
*
* @author JavaGuru_LiuYu
* Date 2025-06-18 16:53:00
*/
@Slf4j
@Component
public class FlowMeterMqttTopicHandleImpl implements MqttTopicHandle {
@Override
public void handle(String topic, String payload) {
log.info("流量计处理策略处理 topic:{}, payload:{}", topic, payload);
// 下面编写对应的处理操作
}
}
/**
* Description: 默认MQTT 主题处理策略
*
* @author JavaGuru_LiuYu
* Date 2025-06-18 17:01:15
*/
@Slf4j
@Component
public class DefaultMqttTopicHandleImpl implements MqttTopicHandle {
@Override
public void handle(String topic, String payload) {
log.info(" 该 topic:{} 暂无正确处理的策略", topic);
// 下面编写对应的处理操作
}
}
四、测试
我们需要进行三个测试:
- SpringBoot 连接 EMQX 测试
- 设备连接测试
- 策略模式的测试
SpringBoot + EMQX 通过MQTT协议和下位机建立通讯并获取下位机的监测数据–>测试