环境:JDK17 Springboot 3.2.4
- 引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
- 配置MqttClient bean
@Bean
public MqttClient mqttClient() throws MqttException {
//new MemoryPersistence()确保文件paho文件不会生成,或者new MqttDefaultFilePersistence(dir)自定义文件生成的路径
MqttClient client = new MqttClient("tcp://mqtt.xxxx.cn:1883", MqttClient.generateClientId(), new MemoryPersistence());
client.setCallback(new MqttCallbackListener());
client.connect(mqttConnectOptions());
client.subscribe(MqttTopicEnum.TOPIC_GENERIC);
return client;
}
/**
* 连接参数
*/
private MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("账号");
options.setPassword("密码".toCharArray());
options.setServerURIs(new String[]{"tcp://mqtt.xxxx.cn:1883"});
options.setConnectionTimeout(30);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
//cleanSession 为false时会持久化会话,重连时会使用之前的会话信息包括未处理的消息,设置为true时重连后开启的是新的会话
options.setCleanSession(true);
return options;
}
- 发布消息
/**
* 发布消息 - MQTT
* 参数:
* qos 表示服务质量,0 最多一次消息可能送达一次也可能无法送达;1 至少一次,可能出现重复;2 恰好一次确保每条消息仅被接收一次
* qos 服务质量越高越耗性能
* retained 是否将当前topic最新的数据保留(其它数据来了会覆盖)用于新订阅者能立即获取到最新状态数据
* mqttClient.getTopic("topic").publish().waitForCompletion() 手动确认消息发送成功, 关注mqttClient.publish方法源码可发现调用
*/
@Override
public void publish(Object message, String topic) {
if (SystemUtil.isNull(message)) {
return;
}
String data = message instanceof String value ? value : JSON.toJSONString(message);
byte[] payload = data.getBytes(StandardCharsets.UTF_8);
try {
mqttClient.publish(topic, payload, 0, false);
} catch (Exception e) {
log.warn("MQTT发送消息失败【{}】【{}】", topic, data, e);
}
}
- 接收消息
public class MqttCallbackListener implements MqttCallbackExtended {
/**
* 连接成功订阅主题
*/
@Override
public void connectComplete(boolean reconnect, String uri) {
log.info("MQTT连接成功【{}】【{}】", (reconnect ? "重连" : "直连"), uri);
}
@Override
public void connectionLost(Throwable throwable) {
log.warn("MQTT连接断开", throwable);
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
try {
String data = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
log.info("MQTT收到主题为【{}】的数据:【{}】", topic, data);
} catch (Exception e) {
log.warn("MQTT数据处理发生异常", e);
}
}
@SneakyThrows(MqttException.class)
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String data = new String(token.getMessage().getPayload(), StandardCharsets.UTF_8);
String clientId = token.getClient().getClientId();
log.info("MQTT发送消息成功【{}】【{}】", clientId, data);
}
}
注意:JDK 21 会有不同后续跟进