Spring Boot 集成 MQTT:实现订阅主题与消息接收详解

发布于:2024-12-22 ⋅ 阅读:(15) ⋅ 点赞:(0)

在上一篇文章中,我们已经完成了 MQTT 客户端的基础配置连接实现。今天,我们将进一步深入,重点实现 订阅 MQTT 主题 并接收消息的功能。通过本教程,你将掌握如何在 Spring Boot 项目中集成 MQTT,实现高效的消息通信。


1. 目标

本次教程的目标是:

  • 订阅指定主题,实时接收 MQTT 服务器发送的消息。
  • 处理接收到的消息,打印消息内容和消息来源的主题名称。

2. 关键代码解析

入栈消息配置:MqttInboundConfiguration

MqttPahoMessageDrivenChannelAdapter 是 Spring Integration 提供的 MQTT 入栈适配器。它负责订阅主题,并将接收到的消息通过通道传递给处理器。

package com.takumilove.mqtt.config;

import com.takumilove.mqtt.domain.MqttConfigurationProperties;
import com.takumilove.mqtt.handler.ReceiverMessageHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * MQTT入栈配置类
 */
@Configuration
@RequiredArgsConstructor
public class MqttInboundConfiguration {

    private final MqttConfigurationProperties mqttConfigurationProperties;
    private final MqttPahoClientFactory mqttPahoClientFactory;
    private final ReceiverMessageHandler receiverMessageHandler;

    /**
     * 消息通道:负责传输接收到的消息
     */
    @Bean
    public MessageChannel messageInboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT入栈消息适配器:订阅指定主题并接收消息
     */
    @Bean
    public MessageProducer messageProducer() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        mqttConfigurationProperties.getUrl(),           // 服务器地址
                        mqttConfigurationProperties.getSubClientId(),   // 客户端ID
                        mqttPahoClientFactory,                          // 客户端工厂
                        mqttConfigurationProperties.getSubTopic());     // 订阅的主题

        adapter.setQos(1); // 消息质量等级:1(至少一次)
        adapter.setConverter(new DefaultPahoMessageConverter()); // 消息转换器
        adapter.setOutputChannel(messageInboundChannel());       // 输出通道
        return adapter;
    }

    /**
     * 消息处理器:接收并处理消息
     */
    @Bean
    @ServiceActivator(inputChannel = "messageInboundChannel")
    public MessageHandler messageHandler() {
        return receiverMessageHandler;
    }
}

核心解析

  • MqttPahoMessageDrivenChannelAdapter:负责订阅指定的 MQTT 主题并接收消息。
  • messageInboundChannel:定义一个消息通道,用于传输接收到的消息。
  • ReceiverMessageHandler:自定义消息处理器,负责处理接收到的消息。

消息处理器:ReceiverMessageHandler

接收到的消息会通过自定义处理器进行处理,并打印消息内容和对应的主题名称。

package com.takumilove.mqtt.handler;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * MQTT消息处理器
 */
@Component
public class ReceiverMessageHandler implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        // 获取消息内容
        Object payload = message.getPayload();

        // 获取消息头
        MessageHeaders headers = message.getHeaders();
        String topicName = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();

        // 打印消息内容和主题名称
        System.out.println("收到消息: " + payload);
        System.out.println("消息主题: " + topicName);
    }
}

处理逻辑

  1. message.getPayload():获取消息的内容。
  2. message.getHeaders():获取消息头信息。
  3. mqtt_receivedTopic:消息头中的 mqtt_receivedTopic 表示接收消息的主题名称。

3. 配置文件

application.yml 中定义 MQTT 的配置信息,包括订阅的主题。

spring:
  mqtt:
    username: takumilove
    password: 123456
    url: tcp://156.238.*******:1883
    subClientId: sub_client_id_123
    subTopic: takumilove/iot/lamp/line

配置项说明

  • usernamepassword:用于连接 MQTT 服务器的认证信息。
  • url:MQTT 服务器的地址,格式为 tcp://ip:port
  • subClientId:客户端 ID,确保每个客户端 ID 唯一,以避免连接冲突。
  • subTopic:需要订阅的 MQTT 主题。

4. 运行效果

当 MQTT 服务器向 takumilove/iot/lamp/line 主题发送消息时,控制台将输出如下内容:

收到消息: <消息内容>
消息主题: takumilove/iot/lamp/line

此时,任何订阅了 takumilove/iot/lamp/line 主题的客户端都能接收到这条消息。

image-20241217142917398


5. 总结

今天我们实现了 订阅主题接收消息 的核心功能。主要包括:

  1. 入栈适配器:通过 MqttPahoMessageDrivenChannelAdapter 订阅主题。
  2. 消息通道:消息通过 DirectChannel 传输。
  3. 消息处理器:自定义处理器处理接收到的消息。

通过以上步骤,我们已经成功在 Spring Boot 项目中集成 MQTT,实现了消息的订阅与接收功能。这为进一步开发复杂的物联网应用奠定了坚实的基础。


参考资料