SpringBoot整合MQTT实战:基于EMQX构建高可靠物联网通信,从零到一实现设备云端双向对话

发布于:2025-05-15 ⋅ 阅读:(15) ⋅ 点赞:(0)

一、引言

随着物联网(IoT)技术的快速发展,MQTT(Message Queuing Telemetry Transport)协议因其轻量级、低功耗和高效的特点,已成为物联网设备通信的事实标准。本文将详细介绍如何使用SpringBoot框架整合MQTT协议,基于开源MQTT代理EMQX实现设备与服务器之间的双向通信。

二、技术选型与环境准备

2.1 技术栈介绍

  • SpringBoot 2.7.x:简化Spring应用初始搭建和开发过程

  • EMQX 5.0:开源的大规模分布式MQTT消息服务器

  • Eclipse Paho:流行的MQTT客户端库

  • Lombok:简化Java Bean编写

2.2 环境准备

  1. 安装EMQX服务器(可使用Docker快速部署):

    docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14

  2. 确保Java开发环境(JDK 11+)和Maven已安装

三、SpringBoot项目集成MQTT

3.1 创建SpringBoot项目并添加依赖

pom.xml中添加必要的依赖:

<dependencies>
    <!-- SpringBoot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <!-- MQTT Paho Client -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

3.2 配置MQTT连接参数

application.yml中添加配置:

mqtt:
  broker-url: tcp://localhost:1883
  username: emqx
  password: public
  client-id: springboot-server
  default-topic: device/status
  timeout: 30
  keepalive: 60
  qos: 1
  clean-session: true

创建配置类MqttProperties.java

@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private String brokerUrl;
    private String username;
    private String password;
    private String clientId;
    private String defaultTopic;
    private int timeout;
    private int keepalive;
    private int qos;
    private boolean cleanSession;
}

3.3 实现MQTT客户端配置

创建MqttConfiguration.java

@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {
    
    private final MqttProperties mqttProperties;
    
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setConnectionTimeout(mqttProperties.getTimeout());
        options.setKeepAliveInterval(mqttProperties.getKeepalive());
        options.setCleanSession(mqttProperties.isCleanSession());
        options.setAutomaticReconnect(true);
        return options;
    }
    
    @Bean
    public IMqttClient mqttClient() throws MqttException {
        IMqttClient client = new MqttClient(
            mqttProperties.getBrokerUrl(), 
            mqttProperties.getClientId(), 
            new MemoryPersistence()
        );
        client.connect(mqttConnectOptions());
        return client;
    }
}

3.4 实现MQTT消息发布服务

创建MqttPublisher.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {
    
    private final IMqttClient mqttClient;
    private final MqttProperties mqttProperties;
    
    public void publish(String topic, String payload) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(mqttProperties.getQos());
        message.setRetained(true);
        
        mqttClient.publish(topic, message);
        
        log.info("MQTT message published to topic: {}, payload: {}", topic, payload);
    }
    
    public void publish(String payload) throws MqttException {
        publish(mqttProperties.getDefaultTopic(), payload);
    }
}

3.5 实现MQTT消息订阅服务

创建MqttSubscriber.java

@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {
    
    private final IMqttClient mqttClient;
    private final MqttProperties mqttProperties;
    
    @PostConstruct
    public void init() throws MqttException {
        subscribe(mqttProperties.getDefaultTopic());
    }
    
    public void subscribe(String topic) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        
        mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);
        log.info("Subscribed to MQTT topic: {}", topic);
    }
    
    private void handleMessage(String topic, MqttMessage message) {
        String payload = new String(message.getPayload());
        log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);
        
        // 这里可以添加业务逻辑处理接收到的消息
        processMessage(topic, payload);
    }
    
    private void processMessage(String topic, String payload) {
        // 示例:解析JSON格式的消息
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(payload);
            
            // 根据不同的topic和payload内容进行业务处理
            if (topic.startsWith("device/status")) {
                handleDeviceStatus(jsonNode);
            } else if (topic.startsWith("device/control")) {
                handleDeviceControl(jsonNode);
            }
        } catch (JsonProcessingException e) {
            log.error("Failed to parse MQTT message payload: {}", payload, e);
        }
    }
    
    private void handleDeviceStatus(JsonNode jsonNode) {
        // 处理设备状态上报
        String deviceId = jsonNode.get("deviceId").asText();
        String status = jsonNode.get("status").asText();
        log.info("Device {} status updated to: {}", deviceId, status);
    }
    
    private void handleDeviceControl(JsonNode jsonNode) {
        // 处理设备控制指令响应
        String deviceId = jsonNode.get("deviceId").asText();
        String command = jsonNode.get("command").asText();
        String result = jsonNode.get("result").asText();
        log.info("Device {} executed command {} with result: {}", deviceId, command, result);
    }
}

四、实现双向通信

4.1 服务器向设备发送控制指令

创建REST API接口用于发送控制指令:

@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {
    
    private final MqttPublisher mqttPublisher;
    
    @PostMapping("/control")
    public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            String payload = mapper.writeValueAsString(command);
            
            String topic = "device/control/" + command.getDeviceId();
            mqttPublisher.publish(topic, payload);
            
            return ResponseEntity.ok("Control command sent successfully");
        } catch (Exception e) {
            log.error("Failed to send control command", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Failed to send control command: " + e.getMessage());
        }
    }
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class DeviceCommand {
        private String deviceId;
        private String command;
        private Map<String, Object> params;
    }
}

4.2 设备模拟客户端

为了测试双向通信,我们可以创建一个简单的设备模拟客户端:

@Component
@Slf4j
public class DeviceSimulator {
    
    private final MqttPublisher mqttPublisher;
    private final MqttProperties mqttProperties;
    private IMqttClient deviceClient;
    
    public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {
        this.mqttPublisher = mqttPublisher;
        this.mqttProperties = mqttProperties;
        initDeviceClient();
    }
    
    private void initDeviceClient() {
        try {
            String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);
            deviceClient = new MqttClient(
                mqttProperties.getBrokerUrl(), 
                deviceId, 
                new MemoryPersistence()
            );
            
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setAutomaticReconnect(true);
            
            deviceClient.connect(options);
            
            // 订阅控制主题
            String controlTopic = "device/control/" + deviceId;
            deviceClient.subscribe(controlTopic, (topic, message) -> {
                String payload = new String(message.getPayload());
                log.info("Device received control command: {}", payload);
                
                // 模拟设备执行命令并返回响应
                executeCommand(payload, deviceId);
            });
            
            // 模拟设备定期上报状态
            simulatePeriodicStatusReport(deviceId);
            
        } catch (MqttException e) {
            log.error("Failed to initialize device simulator", e);
        }
    }
    
    private void executeCommand(String payload, String deviceId) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(payload);
            
            String command = jsonNode.get("command").asText();
            
            // 模拟命令执行
            Thread.sleep(1000); // 模拟执行耗时
            
            // 构造响应
            ObjectNode response = mapper.createObjectNode();
            response.put("deviceId", deviceId);
            response.put("command", command);
            response.put("result", "success");
            response.put("timestamp", System.currentTimeMillis());
            
            // 发布响应
            String responseTopic = "device/control/response/" + deviceId;
            mqttPublisher.publish(responseTopic, response.toString());
            
        } catch (Exception e) {
            log.error("Failed to execute command", e);
        }
    }
    
    private void simulatePeriodicStatusReport(String deviceId) {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> {
            try {
                ObjectMapper mapper = new ObjectMapper();
                ObjectNode status = mapper.createObjectNode();
                status.put("deviceId", deviceId);
                status.put("status", "online");
                status.put("cpuUsage", Math.random() * 100);
                status.put("memoryUsage", 30 + Math.random() * 50);
                status.put("timestamp", System.currentTimeMillis());
                
                String topic = "device/status/" + deviceId;
                mqttPublisher.publish(topic, status.toString());
                
            } catch (Exception e) {
                log.error("Failed to send status report", e);
            }
        }, 0, 10, TimeUnit.SECONDS);
    }
}

五、测试与验证

5.1 测试设备状态上报

  1. 启动SpringBoot应用

  2. 观察日志输出,应该能看到设备模拟客户端定期上报状态信息

5.2 测试服务器控制指令

使用Postman或curl发送控制指令:

curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{
    "deviceId": "device-123456",
    "command": "restart",
    "params": {
        "delay": 5
    }
}'

5.3 验证双向通信

  1. 服务器发送控制指令到特定设备

  2. 设备接收指令并执行

  3. 设备发送执行结果回服务器

  4. 服务器接收并处理设备响应

六、高级功能扩展

6.1 消息持久化与QoS级别

  • QoS 0:最多一次,消息可能丢失

  • QoS 1:至少一次,消息不会丢失但可能重复

  • QoS 2:恰好一次,消息不丢失且不重复

根据业务需求选择合适的QoS级别:

// 在发布消息时设置QoS
message.setQos(2); // 使用最高级别的QoS

6.2 安全配置

  1. 启用TLS加密:

mqtt:
  broker-url: ssl://localhost:8883
  1. 配置EMQX的ACL规则,限制客户端权限

6.3 集群部署

对于生产环境,可以部署EMQX集群:

# 启动第一个节点
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14

# 启动第二个节点
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14

6.4 消息桥接与WebHook

通过EMQX的桥接功能,可以将消息转发到其他MQTT服务器或Kafka等消息队列。也可以通过WebHook将消息推送到HTTP服务。

七、总结

本文详细介绍了如何使用SpringBoot整合MQTT协议,基于EMQX实现设备与服务器之间的双向通信。主要内容包括:

  1. SpringBoot项目中集成MQTT客户端

  2. 实现消息发布和订阅功能

  3. 设计双向通信机制

  4. 设备模拟与测试验证

  5. 高级功能扩展建议

这种架构非常适合物联网场景,能够支持海量设备连接和实时消息通信。开发者可以根据实际业务需求,在此基础上进行扩展和优化,构建稳定可靠的物联网平台。

八、参考资料

  1. EMQX官方文档:Introduction | EMQX 5.0 Docs

  2. Eclipse Paho项目:Eclipse Paho | The Eclipse Foundation

  3. MQTT协议规范:MQTT Version 3.1.1

  4. Spring Boot官方文档:Spring Boot