Java模拟Mqtt客户端基本流程
引入Paho MQTT客户端库
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
设置mqtt配置数据
在application.yml中添加如下配置
mqtt:
broker-url: tcp://42.194.132.44:1883
client-id: mqtt_receive_server
username: mqtt_server
password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
MqttClient配置
将MqttClient加入到IoC容器,并连接客户端
package com.angel.ocean.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = new MqttClient(brokerUrl, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
client.connect(options);
return client;
}
}
MqttService
mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接
package com.angel.ocean.mqtt;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Slf4j
@Service
public class MqttService {
@Resource
private MqttClient client;
@Resource
private KafkaService kafkaService;
@PostConstruct
public void init() throws MqttException {
client.setCallback(new MqttCallbackHandler(kafkaService));
subscribe(MqttTopicConstant.ACTIVATE);
subscribe(MqttTopicConstant.RESET);
subscribe(MqttTopicConstant.ONLINE);
subscribe(MqttTopicConstant.OFFLINE);
subscribe(MqttTopicConstant.REPORT);
}
/**
* 连接
*/
public void connect(String username, String password) throws MqttException {
if(!client.isConnected()) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
client.connect(options);
}
}
/**
* 发送消息
*/
public void publish(String topic, String data) {
if(client.isConnected()) {
MqttMessage message = new MqttMessage(data.getBytes());
message.setQos(0);
try {
client.publish(topic, message);
log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);
} catch (MqttException e) {
log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);
}
return;
}
log.info("Message publish failed, client:{} not online.", client.getClientId());
}
/**
* 订阅
*/
public void subscribe(String topic) {
if(client.isConnected()) {
try {
client.subscribe(topic);
log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);
} catch (MqttException e) {
log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);
}
return;
}
log.info("Message subscribe failed, client:{} not online.", client.getClientId());
}
/**
* 断开连接
*/
public void disconnect() {
try {
client.disconnect();
client.close();
log.info("Disconnected:{}", client.getClientId());
} catch (MqttException e) {
log.error("Message disconnect failed:{}", client.getClientId(), e);
}
}
}
自定义MqttCallback
对客户端连接丢失,收到消息做一些模拟处理
package com.angel.ocean.mqtt;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;
@Slf4j
public class MqttCallbackHandler implements MqttCallback {
private KafkaService kafkaService;
public MqttCallbackHandler(KafkaService kafkaService) {
this.kafkaService = kafkaService;
}
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.info("连接断开...", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String data = new String(message.getPayload());
log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
UpData upData = JSONObject.parseObject(data, UpData.class);
UpKafKaData upKafKaData = new UpKafKaData(topic, data);
log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("deliveryComplete---------:{}", token.isComplete());
}
}
MqttController
用于模拟客户端行为
package com.angel.ocean.controller;
import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 前端控制器
*
* @author Jaime.yu
* @time 2024-12-01
*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {
@Resource
private MqttService mqttService;
@GetMapping("/subscribe")
public ApiResult<?> subscribe(String topic) {
mqttService.subscribe(topic);
return ApiResult.success();
}
@GetMapping("/publish")
public ApiResult<?> publish(String topic, String message) {
mqttService.publish(topic, message);
return ApiResult.success();
}
@GetMapping("/disconnect")
public ApiResult<?> disconnect() {
mqttService.disconnect();
return ApiResult.success();
}
}
代码验证
启动mqtt客户端
如下图客户端已上线:
发送消息
如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world
接收数据
首先我们先订阅个主题:mqtt/0/0
使用MQTTX客户端向该主题发消息
Java mqtt客户端接收数据
查询本地Java mqtt客户收到的消息,如下图收到该消息
mqtt broker 也可以看到该日志:
断开连接
如下图本地客户端862024121819020已断开连接: