文章目录
在上一篇文章中,我们已经完成了 MQTT 客户端的基础配置 和 连接实现。今天,我们将进一步深入,重点实现 订阅 MQTT 主题 并接收消息的功能。通过本教程,你将掌握如何在 Spring Boot 项目中集成 MQTT,实现高效的消息通信。
1. 目标
本次教程的目标是:
- 订阅指定主题,实时接收 MQTT 服务器发送的消息。
- 处理接收到的消息,打印消息内容和消息来源的主题名称。
2. 关键代码解析
入栈消息配置:MqttInboundConfiguration
MqttPahoMessageDrivenChannelAdapter
是 Spring Integration 提供的 MQTT 入栈适配器。它负责订阅主题,并将接收到的消息通过通道传递给处理器。
package com.takumilove.mqtt.config;
import com.takumilove.mqtt.domain.MqttConfigurationProperties;
import com.takumilove.mqtt.handler.ReceiverMessageHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT入栈配置类
*/
@Configuration
@RequiredArgsConstructor
public class MqttInboundConfiguration {
private final MqttConfigurationProperties mqttConfigurationProperties;
private final MqttPahoClientFactory mqttPahoClientFactory;
private final ReceiverMessageHandler receiverMessageHandler;
/**
* 消息通道:负责传输接收到的消息
*/
@Bean
public MessageChannel messageInboundChannel() {
return new DirectChannel();
}
/**
* MQTT入栈消息适配器:订阅指定主题并接收消息
*/
@Bean
public MessageProducer messageProducer() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
mqttConfigurationProperties.getUrl(), // 服务器地址
mqttConfigurationProperties.getSubClientId(), // 客户端ID
mqttPahoClientFactory, // 客户端工厂
mqttConfigurationProperties.getSubTopic()); // 订阅的主题
adapter.setQos(1); // 消息质量等级:1(至少一次)
adapter.setConverter(new DefaultPahoMessageConverter()); // 消息转换器
adapter.setOutputChannel(messageInboundChannel()); // 输出通道
return adapter;
}
/**
* 消息处理器:接收并处理消息
*/
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel")
public MessageHandler messageHandler() {
return receiverMessageHandler;
}
}
核心解析:
MqttPahoMessageDrivenChannelAdapter
:负责订阅指定的 MQTT 主题并接收消息。messageInboundChannel
:定义一个消息通道,用于传输接收到的消息。ReceiverMessageHandler
:自定义消息处理器,负责处理接收到的消息。
消息处理器:ReceiverMessageHandler
接收到的消息会通过自定义处理器进行处理,并打印消息内容和对应的主题名称。
package com.takumilove.mqtt.handler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* MQTT消息处理器
*/
@Component
public class ReceiverMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// 获取消息内容
Object payload = message.getPayload();
// 获取消息头
MessageHeaders headers = message.getHeaders();
String topicName = Objects.requireNonNull(headers.get("mqtt_receivedTopic")).toString();
// 打印消息内容和主题名称
System.out.println("收到消息: " + payload);
System.out.println("消息主题: " + topicName);
}
}
处理逻辑:
message.getPayload()
:获取消息的内容。message.getHeaders()
:获取消息头信息。mqtt_receivedTopic
:消息头中的mqtt_receivedTopic
表示接收消息的主题名称。
3. 配置文件
在 application.yml
中定义 MQTT 的配置信息,包括订阅的主题。
spring:
mqtt:
username: takumilove
password: 123456
url: tcp://156.238.*******:1883
subClientId: sub_client_id_123
subTopic: takumilove/iot/lamp/line
配置项说明:
username
和password
:用于连接 MQTT 服务器的认证信息。url
:MQTT 服务器的地址,格式为tcp://ip:port
。subClientId
:客户端 ID,确保每个客户端 ID 唯一,以避免连接冲突。subTopic
:需要订阅的 MQTT 主题。
4. 运行效果
当 MQTT 服务器向 takumilove/iot/lamp/line
主题发送消息时,控制台将输出如下内容:
收到消息: <消息内容>
消息主题: takumilove/iot/lamp/line
此时,任何订阅了 takumilove/iot/lamp/line
主题的客户端都能接收到这条消息。
5. 总结
今天我们实现了 订阅主题 和 接收消息 的核心功能。主要包括:
- 入栈适配器:通过
MqttPahoMessageDrivenChannelAdapter
订阅主题。 - 消息通道:消息通过
DirectChannel
传输。 - 消息处理器:自定义处理器处理接收到的消息。
通过以上步骤,我们已经成功在 Spring Boot 项目中集成 MQTT,实现了消息的订阅与接收功能。这为进一步开发复杂的物联网应用奠定了坚实的基础。