一:前提
安装了Emqx开源版、MQTTX客户端
二:订阅发布实现步骤
1.引入依赖
<!--MQTT客户端-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
2.编辑配置文件
mqtt:
broker:
uri: tcp://127.0.0.1:31883
client:
id: mqtt-am-client-${random.uuid}
# 订阅主题配置(支持多个)
inTopics:
- topic: test/topic1
qos: 0
- topic: test/topic2
qos: 1
- topic: test/topic3
qos: 2
# 发布主题配置(支持多个)
outTopics:
- topic: out/topic1
qos: 0
username: am
password: LGyPtuAB4th5p
keepAliveInterval: 60
3.读取配置文件
package com.wtzn.web.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {
private Broker broker;
private Client client;
private List<TopicConfig> inTopics;
private List<TopicConfig> outTopics;
private String userName;
private String password;
private int KeepAliveInterval;
@Data
public static class Broker {
private String uri;
}
@Data
public static class Client {
private String id;
}
@Data
public static class TopicConfig {
private String topic;
private int qos;
}
}
4.创建Mqtt客户端
package com.wtzn.web.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
// 此客户端的用户名和密码
options.setUserName(mqttProperties.getUserName());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setCleanSession(true);
// 设置遗嘱消息
// options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);
// 连接超时重试
options.setConnectionTimeout(5000); //毫秒
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
options.setAutomaticReconnect(true);//网络中断重连
client.connect(options);
return client;
}
}
5.controller层
package com.wtzn.web.controller;
import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.LinkedList;
@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttService mqttService;
@SaIgnore
@PostMapping("/mqtt")
public void publish() {
try {
// LinkedList<Payload> payloadLinkedList=new LinkedList<>();
for(int i=1; i<=10000; i++){
Payload payload=new Payload();
payload.setTemperature(i);
// payloadLinkedList.add(payload);
mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
}
} catch (MqttException e) {
log.error("发布消息失败{}", e.getMessage());
}
log.info("发布消息成功");
}
}
6.service层
package com.wtzn.web.service;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {
@Autowired
private MqttClient mqttClient;
@Autowired
private MqttProperties mqttProperties;
@PostConstruct
public void init() throws MqttException {
mqttClient.setCallback(this);
/* mqttClient.subscribe(mqttProperties.getInTopic());
log.info("订阅主题{}", mqttProperties.getInTopic());
*/
mqttProperties.getInTopics().forEach(x -> {
try {
mqttClient.subscribe(x.getTopic(), x.getQos());
log.info("订阅主题{}", x.getTopic());
} catch (MqttException e) {
throw new RuntimeException(e);
}
});
}
@PreDestroy
public void destroy() throws MqttException {
mqttClient.disconnect();
log.info("与服务器断开连接");
}
/**
* @description: 发送消息
* @param: [message]
* @return: void
**/
public void publish(String topic,int qos,String message) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
mqttClient.publish(topic, mqttMessage);
log.info("向主题【{}】发布消息:【{}】", topic, message);
}
/**
* @description: 接收消息
* @param: [topic, message]
* @return: void
**/
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);
log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());
/* if (payload.getTemperature() > 37) {
publish("发烧");
}*/
}
@Override
public void connectionLost(Throwable cause) {
log.error("连接丢失:{}", cause.getMessage());
}
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
if( token!=null ){
MqttMessage message = null;
try {
message = token.getMessage();
} catch (MqttException e) {
throw new RuntimeException(e);
}
String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
String str = message==null ? null : new String(message.getPayload());
log.info("deliveryComplete: topic={}, message={}", topic, str);
} else {
log.info("deliveryComplete: null");
}
log.info("消息已送达");
}
@Override
public void connectComplete(boolean b, String s) {
mqttProperties.getInTopics().forEach(x -> {
try {
mqttClient.subscribe(x.getTopic(), x.getQos());
log.info("订阅主题{}", x.getTopic());
} catch (MqttException e) {
throw new RuntimeException(e);
}
});
}
}
7.dao层
package com.wtzn.web.domain.bo;
import lombok.Data;
@Data
public class Payload {
private Integer temperature;
}