mqtt快速部署、集成、调优

发布于:2025-08-07 ⋅ 阅读:(21) ⋅ 点赞:(0)

mqtt快速部署、集成、调优

1.部署

1.1 docker部署

参考:https://blog.csdn.net/taotao_guiwang/article/details/135508643
在这里插入图片描述

1.2 mqtt部署

# 赋权,在mqtt_install目录,执行:
chmod -R 777 .
# 执行
./mqtt_install.sh
  • 部署完成:
    在这里插入图片描述
  • 访问:http://10.86.97.210:38083/
    账号:admin
    初始密码:public
    首次登录会提示修改密码。
    在这里插入图片描述

2. springboot 集成

2.1 集成

  • 核心代码,MqttConfig.java,包括消息订阅、消息发布、消费异常监听等。
package com.mqtt;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.integration.stream.CharacterStreamReadingMessageSource;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.ErrorMessage;

import java.util.UUID;

@Configuration
public class MqttConfig {

    @Value("${mqtt.brokerUrl}") private String brokerUrl;
    @Value("${mqtt.username}") private String username;
    @Value("${mqtt.password}") private String password;
    @Value("${mqtt.timeout}") private int timeout;
    @Value("${mqtt.clientId}") private String clientId;

    // 消息订阅
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        // 注意:生产者和消费者的clientId不能相同
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(brokerUrl, clientId+"-consumer",
                        "sensors/temperature");// 这个topic在实际项目中要根据实际情况调整
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        adapter.setErrorChannelName("mqttConsumerErrorChannel");
        return adapter;
    }

    // 消费异常监听
    @ServiceActivator(inputChannel = "mqttConsumerErrorChannel")
    public void handleError(ErrorMessage error) {
        // 处理消费异常
        System.out.println("消费异常:"+error.getPayload().getMessage());
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("消费到主题为sensors/temperature的消息:"+message.getPayload());
            }
        };
    }

    /// 消息发布
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setAutomaticReconnect(true);
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        // 注意:生产者和消费者的clientId不能相同
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(clientId+"-producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setAsyncEvents(true);
        messageHandler.setDefaultTopic("defaultTopic");
        return messageHandler;
    }


    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        // 发送到默认主题
        void sendToMqtt(String payload);

        // 动态指定主题和QoS
        void sendToMqtt(
                @Header(MqttHeaders.TOPIC) String topic,
                @Header(MqttHeaders.QOS) int qos,
                String payload
        );

    }

}

  • MqttSendEventListener.java,监听发送状态,捕获发送异常:
package com.mqtt;

import org.springframework.context.ApplicationListener;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.stereotype.Component;

/**
 * 监听发送状态,捕获发送异常
 */
@Component
public class MqttSendEventListener implements ApplicationListener<MqttMessageSentEvent> {
    @Override
    public void onApplicationEvent(MqttMessageSentEvent event) {
        if(event.getCause()!=null){
            Throwable cause = event.getCause();
            String topic = event.getTopic();
            System.out.println("消息发送异常: Topic="+topic+", 错误="+cause.getMessage());
        }else{
            System.out.println("消息发送成功: Topic="+event.getTopic()+", Payload="+event.getMessage());
        }
    }
}

  • MqttController.java,模拟调用:
package com.mqtt;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;

/**
 * mqtt调用接口
 *
 */
@RequestMapping("/mqtt")
@RestController
public class MqttController {
    @Autowired
    private MqttConfig.MqttGateway mqttGateway;

    @PostMapping("/send-command")
    public String sendCommand(@RequestBody String message) {
        // 发送到默认主题
        mqttGateway.sendToMqtt(message);

        // 动态指定主题和QoS(0-最多1次,1-至少1次,2-恰好1次)
        mqttGateway.sendToMqtt("sensors/temperature", 2, message);
        return "消息已发送";
    }

}

  • application.yml:
server:
  port: 8080

# mqtt配置
mqtt:
  brokerUrl: tcp://10.86.97.210:20003
  username: admin
  password: test@123456
  clientId: clientId
  timeout: 30
  • pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.elk</groupId>
	<artifactId>spring-boot-mqtt</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spring-boot-mqtt</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.15</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>3.12.0</version> <!-- 推荐稳定版本 -->
		</dependency>

		<!-- hutool -->
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.7.17</version>
		</dependency>

		<!-- 阿里JSON解析器 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.83</version>
		</dependency>

		<!--MQTT-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>

		<dependency>
			<groupId>org.eclipse.paho</groupId>
			<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
			<version>1.2.5</version>
		</dependency>

	  </dependencies>

</project>

2.2 调用

  • postman调用:
    在这里插入图片描述
  • 控制台打印:
    在这里插入图片描述

3.相关资源

百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac
在这里插入图片描述

spring官方的集成示例,可参考:https://github.com/spring-projects/spring-integration-samples


网站公告

今日签到

点亮在社区的每一天
去签到