Java连接Emqx实现订阅发布消息

发布于:2025-07-10 ⋅ 阅读:(10) ⋅ 点赞:(0)

一:前提

        安装了Emqx开源版、MQTTX客户端

二:订阅发布实现步骤

1.引入依赖 

<!--MQTT客户端-->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

2.编辑配置文件

mqtt:
  broker:
    uri: tcp://127.0.0.1:31883
  client:
    id: mqtt-am-client-${random.uuid}
  # 订阅主题配置(支持多个)
  inTopics:
    - topic: test/topic1
      qos: 0
    - topic: test/topic2
      qos: 1
    - topic: test/topic3
      qos: 2
  # 发布主题配置(支持多个)
  outTopics:
    - topic: out/topic1
      qos: 0
  username: am
  password: LGyPtuAB4th5p
  keepAliveInterval: 60

3.读取配置文件

package com.wtzn.web.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;


@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {
    private Broker broker;
    private Client client;
    private List<TopicConfig> inTopics;
    private List<TopicConfig> outTopics;
    private String userName;
    private String password;
    private int KeepAliveInterval;

    @Data
    public static class Broker {
        private String uri;
    }

    @Data
    public static class Client {
        private String id;
    }
    @Data
    public static class TopicConfig {
        private String topic;
        private int qos;
    }

}

4.创建Mqtt客户端

package com.wtzn.web.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 此客户端的用户名和密码
        options.setUserName(mqttProperties.getUserName());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setCleanSession(true);
        // 设置遗嘱消息
      //  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下线,这是我的遗嘱".getBytes(), 2, true);
        // 连接超时重试
        options.setConnectionTimeout(5000); //毫秒
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(true);//网络中断重连
        client.connect(options);
        return client;
    }
}

5.controller层

package com.wtzn.web.controller;

import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.LinkedList;


@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    @SaIgnore
    @PostMapping("/mqtt")
    public void publish() {
        try {
          //  LinkedList<Payload> payloadLinkedList=new LinkedList<>();
            for(int i=1; i<=10000; i++){
                Payload payload=new Payload();
                payload.setTemperature(i);
              //  payloadLinkedList.add(payload);
                mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
            }

        } catch (MqttException e) {
            log.error("发布消息失败{}", e.getMessage());
        }
        log.info("发布消息成功");
    }


}

6.service层

package com.wtzn.web.service;

import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;


@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {

    @Autowired
    private MqttClient mqttClient;

    @Autowired
    private MqttProperties mqttProperties;
    
    @PostConstruct
    public void init() throws MqttException {
        mqttClient.setCallback(this);
 /*       mqttClient.subscribe(mqttProperties.getInTopic());
        log.info("订阅主题{}", mqttProperties.getInTopic());
*/
        mqttProperties.getInTopics().forEach(x -> {
            try {
                mqttClient.subscribe(x.getTopic(), x.getQos());
                log.info("订阅主题{}", x.getTopic());
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        });

    }

    @PreDestroy
    public void destroy() throws MqttException {
        mqttClient.disconnect();
        log.info("与服务器断开连接");
    }

    /**
     * @description: 发送消息
     * @param: [message]
     * @return: void
     **/
    public void publish(String topic,int qos,String message) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        mqttClient.publish(topic, mqttMessage);
        log.info("向主题【{}】发布消息:【{}】", topic, message);
    }


    /**
     * @description: 接收消息
     * @param: [topic, message]
     * @return: void
     **/
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);
        log.info("接收到来自【{}】的消息【{}】", topic, payload.getTemperature());
      /*  if (payload.getTemperature() > 37) {
            publish("发烧");
        }*/


    }


    @Override
    public void connectionLost(Throwable cause) {
        log.error("连接丢失:{}", cause.getMessage());
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = null;
            try {
                message = token.getMessage();
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }

        log.info("消息已送达");
    }

    @Override
    public void connectComplete(boolean b, String s) {

            mqttProperties.getInTopics().forEach(x -> {
                try {
                    mqttClient.subscribe(x.getTopic(), x.getQos());
                    log.info("订阅主题{}", x.getTopic());
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
            });
    }
}

7.dao层

package com.wtzn.web.domain.bo;

import lombok.Data;

@Data
public class Payload {
    private Integer temperature;
}

三:测试

1.PostMan直接调用测试

2、下载MQTTX客户端进行测试


网站公告

今日签到

点亮在社区的每一天
去签到