mqtt快速部署、集成、调优
1.部署
1.1 docker部署
参考:https://blog.csdn.net/taotao_guiwang/article/details/135508643
1.2 mqtt部署
资源见,百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac
执行mqtt_install.sh,开始部署:
# 赋权,在mqtt_install目录,执行:
chmod -R 777 .
# 执行
./mqtt_install.sh
- 部署完成:
- 访问:http://10.86.97.210:38083/
账号:admin
初始密码:public
首次登录会提示修改密码。
2. springboot 集成
2.1 集成
- 核心代码,MqttConfig.java,包括消息订阅、消息发布、消费异常监听等。
package com.mqtt;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.integration.stream.CharacterStreamReadingMessageSource;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.ErrorMessage;
import java.util.UUID;
@Configuration
public class MqttConfig {
@Value("${mqtt.brokerUrl}") private String brokerUrl;
@Value("${mqtt.username}") private String username;
@Value("${mqtt.password}") private String password;
@Value("${mqtt.timeout}") private int timeout;
@Value("${mqtt.clientId}") private String clientId;
// 消息订阅
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
// 注意:生产者和消费者的clientId不能相同
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(brokerUrl, clientId+"-consumer",
"sensors/temperature");// 这个topic在实际项目中要根据实际情况调整
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
adapter.setErrorChannelName("mqttConsumerErrorChannel");
return adapter;
}
// 消费异常监听
@ServiceActivator(inputChannel = "mqttConsumerErrorChannel")
public void handleError(ErrorMessage error) {
// 处理消费异常
System.out.println("消费异常:"+error.getPayload().getMessage());
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("消费到主题为sensors/temperature的消息:"+message.getPayload());
}
};
}
/// 消息发布
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
// 注意:生产者和消费者的clientId不能相同
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId+"-producer", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setDefaultTopic("defaultTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 发送到默认主题
void sendToMqtt(String payload);
// 动态指定主题和QoS
void sendToMqtt(
@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload
);
}
}
- MqttSendEventListener.java,监听发送状态,捕获发送异常:
package com.mqtt;
import org.springframework.context.ApplicationListener;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.stereotype.Component;
/**
* 监听发送状态,捕获发送异常
*/
@Component
public class MqttSendEventListener implements ApplicationListener<MqttMessageSentEvent> {
@Override
public void onApplicationEvent(MqttMessageSentEvent event) {
if(event.getCause()!=null){
Throwable cause = event.getCause();
String topic = event.getTopic();
System.out.println("消息发送异常: Topic="+topic+", 错误="+cause.getMessage());
}else{
System.out.println("消息发送成功: Topic="+event.getTopic()+", Payload="+event.getMessage());
}
}
}
- MqttController.java,模拟调用:
package com.mqtt;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
/**
* mqtt调用接口
*
*/
@RequestMapping("/mqtt")
@RestController
public class MqttController {
@Autowired
private MqttConfig.MqttGateway mqttGateway;
@PostMapping("/send-command")
public String sendCommand(@RequestBody String message) {
// 发送到默认主题
mqttGateway.sendToMqtt(message);
// 动态指定主题和QoS(0-最多1次,1-至少1次,2-恰好1次)
mqttGateway.sendToMqtt("sensors/temperature", 2, message);
return "消息已发送";
}
}
- application.yml:
server:
port: 8080
# mqtt配置
mqtt:
brokerUrl: tcp://10.86.97.210:20003
username: admin
password: test@123456
clientId: clientId
timeout: 30
- pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.elk</groupId>
<artifactId>spring-boot-mqtt</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-mqtt</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version> <!-- 推荐稳定版本 -->
</dependency>
<!-- hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.17</version>
</dependency>
<!-- 阿里JSON解析器 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!--MQTT-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
</project>
2.2 调用
- postman调用:
- 控制台打印:
3.相关资源
百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac
spring官方的集成示例,可参考:https://github.com/spring-projects/spring-integration-samples