SpringBoot + EMQX 通过MQTT协议和下位机建立通讯并获取下位机的监测数据

发布于:2025-06-26 ⋅ 阅读:(18) ⋅ 点赞:(0)

一、问题描述

我们公司是物联网科技公司,现在需要对一批下位机的监测数据进行收集分析。如何与下位机进行通讯并获取下位机上传的监测数据呢?

二、解决方案

现通过SpringBoot + EMQX + MQTT通讯协议,建立与下位机的通讯,实现对下位机上传的监测数据进行接收。

三、解决步骤

1. 安装 EMQX 消息中间件

1.1 下载 EMQX 安装包(这里使用的Windows 版本进行测试)

直接去管网进行下载(EMQX 下载),EMQX从5.4.0版本起,不再支持windows系统,所以我们选5.3.2的windows版本进行下载(Windows 版本在最下面)。
在这里插入图片描述

1.2 安装 EMQX

解压下载的安装包,注意,路径不要带中文,得到如下目录
在这里插入图片描述

1.3 启动 EMQX

在bin目录下启动控制台
在这里插入图片描述

在控制台里输入命令,启动 EMQX

// 启动 emqx
emqx start

// 停止 emqx
emqx stop 

如图,则启动成功
在这里插入图片描述

1.4 登录 EMQX

访问 Web页面地址,默认账号密码:admin/public
在这里插入图片描述

http://localhost:18083

登录成功后
在这里插入图片描述

2. SpringBoot 整合 EMQX

2.1 在配置文件中配置 EMQX 的相关信息

--- #################### emqx 相关配置 ####################
iot:
  emq:
    # MQTT 登录账号(与 EMQX 中设置的一致)
    username: user001
    # MQTT 登录密码
    password: tc123456
    # MQTT 服务端连接地址(支持 tcp、ssl、ws 等协议)
    # 示例:tcp://localhost:1883 或 ssl://host:8883
    hostUrl: tcp://127.0.0.1:1883
    # 客户端唯一 ID,多个客户端不能重复
    # 可使用 Spring 表达式随机生成,如:${random.int}
    clientId: ${random.int}
    # 连接超时时间(单位:秒),默认建议 10 秒
    timeout: 10
    # 心跳包发送间隔(单位:秒),默认 60 秒
    keepalive: 60
    # 是否清除会话(false 表示保留订阅信息,接收离线期间的消息)
    # true 表示每次连接都是全新会话,断开后无法接收历史消息
    clearSession: false
    # 是否启用共享订阅(用于多个客户端负载均衡接收同一个主题的消息)
    # 开启后将自动使用 `$share/组名/主题` 格式
    sharedGroup: false
    # 默认订阅的主题列表(可配置多个,这里订阅所有的主题)
    defaultTopics:
      - /+

2.2 代码相关部分

代码结构
在这里插入图片描述

编写 MQTT 客户端常用的回调接口 MqttCallbackExtended,用于处理 连接状态变化、消息到达、消息发送完成等事件。

/**
 * Description: 用于处理MQTT连接的回调,如连接断开、消息到达、消息发布完成、连接完成等事件。
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-24 16:12:26
 */
@Slf4j
@Component
public class EmqxCallback implements MqttCallbackExtended {
    @Resource
    private EmqxService emqxService;

    @Resource
    private EmqxClient emqxClient;

    /**
     * 当连接丢失时触发,例如网络异常或服务端断开连接
     * 会自动调用 EmqxClient 的重连机制(带指数退避)
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.warn("MQTT 连接断开,准备 2 秒后重连...", cause);
        emqxClient.scheduleReconnectWithBackoff();
    }

    /**
     * 当订阅的主题收到消息时触发
     *
     * @param topic       消息所属的主题
     * @param mqttMessage 收到的消息内容
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        emqxService.subscribeCallback(topic, mqttMessage);
    }

    /**
     * 当消息成功发送完成时触发(QoS 1/2 的 ACK 回执)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("消息发送完成: {}", token.getMessageId());
    }

    /**
     * 当连接成功建立时触发(包括初始连接或重连)
     *
     * @param reconnect 是否是重连
     * @param serverUri 连接的服务器地址
     */
    @Override
    public void connectComplete(boolean reconnect, String serverUri) {
        log.info("MQTT 已连接到服务器: {}(是否为重连:{})", serverUri, reconnect);
        // 自动重新订阅所有主题
        emqxService.subscribe(emqxClient.getMqttClient());
    }
}

MQTT 客户端管理器,封装连接到 EMQX 服务的所有操作(包含重连机制)。

/**
 * Description: MQTT客户端类,负责建立与MQTT服务器的连接,提供发布消息和订阅主题的功能
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-24 16:17:28
 */
@Slf4j
@Data
@Component
public class EmqxClient {

    @Resource
    private EmqxCallback emqxCallback;

    @Resource
    private MqttConfig mqttConfig;

    // MQTT 客户端实例
    private MqttClient mqttClient;

    // 用于调度重连的线程池(单线程)
    private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();

    // 防止重复重连的标志
    private volatile boolean reconnecting = false;

    /**
     * 初始化或主动连接 MQTT
     */
    public synchronized void connect() {
        try {
            // 如果还未创建 mqttClient,则先构造
            if (mqttClient == null) {
                createMqttClient();
            }

            // 如果未连接,则尝试连接
            if (!mqttClient.isConnected()) {
                mqttClient.connect(createMqttOptions());
                log.info("MQTT 客户端连接成功 ✅");
            }
        } catch (MqttException e) {
            log.error("MQTT 初始连接失败 ❌,准备进入重连流程", e);
            // 启动自动重连机制
            scheduleReconnectWithBackoff();
        }
    }

    /**
     * 异步重连任务(带指数退避)
     */
    public void scheduleReconnectWithBackoff() {
        if (reconnecting) {
            return; // 已经有重连任务在执行,避免重复
        }
        reconnecting = true;

        reconnectScheduler.execute(() -> {
            int retry = 0;
            long delay = 2000; // 初始延迟:2秒

            while (!isConnected()) {
                try {
                    retry++;
                    log.warn("MQTT 断线重连中,第 {} 次,等待 {} ms", retry, delay);

                    Thread.sleep(delay);

                    connect(); // 尝试连接(内部已做判断)

                    if (isConnected()) {
                        log.info("MQTT 重连成功 ✅");
                        reconnecting = false;
                        return;
                    }
                } catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    log.error("MQTT 重连异常 ❌", e);
                }

                // 延迟时间指数增长,最大不超过 60 秒
                delay = Math.min(delay * 2, 60000);
            }

            reconnecting = false;
        });
    }

    /**
     * 创建 MQTT 客户端
     */
    private void createMqttClient() throws MqttException {
        String clientId = "TC-" + mqttConfig.getClientId() + "-" + UUID.randomUUID();
        mqttClient = new MqttClient(mqttConfig.getHostUrl(), clientId, new MemoryPersistence());
        mqttClient.setCallback(emqxCallback);
    }

    /**
     * 构造连接参数(用户名、密码等)
     */
    private MqttConnectOptions createMqttOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        options.setConnectionTimeout(mqttConfig.getTimeout());
        options.setKeepAliveInterval(mqttConfig.getKeepalive());
        options.setCleanSession(mqttConfig.isClearSession());
        return options;
    }

    /**
     * 判断客户端是否连接成功
     */
    public boolean isConnected() {
        return mqttClient != null && mqttClient.isConnected();
    }

    /**
     * 发布消息,未连接时自动尝试重连
     */
    public void publish(String topic, String message) {
        try {
            if (!isConnected()) {
                log.warn("MQTT 未连接,准备重连后再发布");
                scheduleReconnectWithBackoff();
                return;
            }
            mqttClient.publish(topic, new MqttMessage(message.getBytes(StandardCharsets.UTF_8)));
            log.info("消息发布成功 -> 主题: {}", topic);
        } catch (MqttException e) {
            log.error("发布消息失败 -> 主题: {}", topic, e);
        }
    }

    /**
     * 订阅主题,未连接时自动尝试重连
     */
    public void subscribe(String topic) {
        try {
            if (!isConnected()) {
                log.warn("MQTT 未连接,准备重连后再订阅");
                scheduleReconnectWithBackoff();
                return;
            }
            mqttClient.subscribe(topic, 1);
            log.info("订阅主题成功: {}", topic);
        } catch (MqttException e) {
            log.error("订阅主题失败: {}", topic, e);
        }
    }

    /**
     * Bean 销毁前关闭连接并释放线程池
     */
    @PreDestroy
    public void shutdown() {
        if (isConnected()) {
            try {
                mqttClient.disconnect();
                mqttClient.close();
                log.info("MQTT 客户端已安全关闭");
            } catch (MqttException e) {
                log.error("关闭 MQTT 客户端失败", e);
            }
        }
        reconnectScheduler.shutdownNow();
    }
}

MQTT 配置类:用于绑定 application.yml 中的连接信息。

/**
 * Description: MQTT 配置类:用于绑定 application.yml 中的连接信息
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-24 16:22:16
 */
@Data
@Component
@ConfigurationProperties(prefix = "iot.emq")
public class MqttConfig {

    /**
     * MQTT 用户名
     */
    private String username;

    /**
     * MQTT 密码
     */
    private String password;

    /**
     * MQTT 服务端地址
     */
    private String hostUrl;

    /**
     * 客户端唯一 ID
     */
    private String clientId;

    /**
     * 连接超时时间(单位:秒)
     */
    private int timeout;

    /**
     * 心跳间隔时间(单位:秒)
     */
    private int keepalive;

    /**
     * 是否清除会话
     */
    private boolean clearSession;

    /**
     * 是否启用共享订阅
     */
    private boolean sharedGroup;

    /**
     * 默认订阅的主题列表
     */
    private List<String> defaultTopics = new ArrayList<>();
}

MQTT 模块中的核心业务逻辑实现部分。

/**
 * Description: 用于处理MQTT消息的具体业务逻辑,如订阅回调
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-18 14:10:05
 */
public interface EmqxService {

    /**
     * 订阅回调
     *
     * @param topic       主题
     * @param mqttMessage 消息
     */
    void subscribeCallback(String topic, MqttMessage mqttMessage);

    /**
     * 订阅主题
     *
     * @param client MQTT 客户端
     */
    void subscribe(MqttClient client);
}
/**
 * Description: 用于处理MQTT消息的具体业务逻辑,如订阅回调
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-18 14:10:18
 */
@Slf4j
@Service
public class EmqxServiceImpl implements EmqxService {

    /**
     * 策略映射池,key 为策略类名,value 为对应处理器
     */
    private final Map<String, MqttTopicHandle> handleMap = new ConcurrentHashMap<>();

    private final ApplicationContext applicationContext;


    @Resource
    private MqttConfig mqttConfig;

    public EmqxServiceImpl(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }


    // 初始化映射表
    @PostConstruct
    public void init() {
        handleMap.put("/flow_meter", applicationContext.getBean(FlowMeterMqttTopicHandleImpl.class));
        handleMap.put("/liquid_meter", applicationContext.getBean(LiquidMeterMqttTopicHandleImpl.class));
    }

    /**
     * 当收到 MQTT 消息时触发
     *
     * @param topic       消息主题
     * @param mqttMessage MQTT 消息体
     */
    @Override
    public void subscribeCallback(String topic, MqttMessage mqttMessage) {
        String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
        log.info("收到消息 -> 主题: {}, 内容: {}", topic, payload);

        // 选择合适的策略处理数据
        handleMap.getOrDefault(topic, new DefaultMqttTopicHandleImpl())
                .handle(topic,payload);
    }

    /**
     * 在 MQTT 连接建立完成后,默认订阅 mqttConfig 配置中的主题
     *
     * @param client MQTT 客户端实例
     */
    @Override
    public void subscribe(MqttClient client) {
        try {
            for (String topic : mqttConfig.getDefaultTopics()) {
                client.subscribe(topic, 1);
                log.info("默认订阅主题成功: {}", topic);
            }
        } catch (Exception e) {
            log.error("默认订阅主题失败", e);
        }
    }
}

自动初始化 MQTT 连接,当 Spring Boot 项目启动完成时,自动连接到 EMQX 服务器(MQTT Broker)。

/**
 * Description: 用于在应用启动时自动连接MQTT服务器
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-24 16:29:23
 */
@Component
public class EmqxStart implements ApplicationRunner {

    @Resource
    private EmqxClient emqxClient;

    @Override
    public void run(ApplicationArguments args) {
        emqxClient.connect();
    }
}

在上述的核心业务逻辑实现部分中,我们使用策略模式对代码进行优化,对不同主题推送的数据选择不同的处理策略。

/**
 * Description: 主题策略
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-18 13:47:21
 */
public interface MqttTopicHandle {

    /**
     * 处理策略
     *
     * @param topic   主题
     * @param payload 载荷
     */
    void handle(String topic, String payload);
}
/**
 * Description: 液位计MQTT 主题处理策略
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-18 16:56:09
 */
@Slf4j
@Component
public class LiquidMeterMqttTopicHandleImpl implements MqttTopicHandle {

    @Override
    public void handle(String topic, String payload) {
        log.info("液位计处理策略处理  topic:{}, payload:{}", topic, payload);

        // 下面编写对应的处理操作
    }
}
/**
 * Description: 流量计MQTT 主题处理策略
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-18 16:53:00
 */
@Slf4j
@Component
public class FlowMeterMqttTopicHandleImpl implements MqttTopicHandle {

    @Override
    public void handle(String topic, String payload) {
        log.info("流量计处理策略处理  topic:{}, payload:{}", topic, payload);

        // 下面编写对应的处理操作
    }
}
/**
 * Description: 默认MQTT 主题处理策略
 *
 * @author JavaGuru_LiuYu
 * Date 2025-06-18 17:01:15
 */
@Slf4j
@Component
public class DefaultMqttTopicHandleImpl implements MqttTopicHandle {

    @Override
    public void handle(String topic, String payload) {
        log.info(" 该 topic:{} 暂无正确处理的策略", topic);

        // 下面编写对应的处理操作
    }
}

四、测试

我们需要进行三个测试:

  1. SpringBoot 连接 EMQX 测试
  2. 设备连接测试
  3. 策略模式的测试
    SpringBoot + EMQX 通过MQTT协议和下位机建立通讯并获取下位机的监测数据–>测试

网站公告

今日签到

点亮在社区的每一天
去签到