spring项目集成MQTT消息收发

发布于:2023-01-04 ⋅ 阅读:(398) ⋅ 点赞:(0)

mqtt简介

随着物联网的发展,越来越多的开发同学接开始触物联网平台的开发,其中通信协议是物联网类项目首先要面临的一个选择,MQTT就是一个应用非常广泛的物联网协议。我们可以看MQTT官网的介绍:

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

Java sdk选择

作为Java开发者,比较常用的两个mqtt sdk是eclipse的paho和fusesource的mqtt-client。但是直接使用这两个库的话,开发者需要做很多自定义的工作,比如重连处理、异常处理等细节,稍有不慎就会导致连接异常,重连失败等严重的线上问题。这里比较推荐大家使用spring-integration-mqtt,属于spring大家族的子项目,开箱即用,只需要做简单的配置,就可以实现消息收发功能。

引入依赖包

这里有一点需要注意,paho 1.2.1之前版本有严重缺陷,一定要升级到较新版本

<!-- MQTT -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.1</version>
</dependency>

yml配置

mqtt:
  url: tcp://127.0.0.1:1883
  username: your_username
  password: your_pwd
  clientId: your_client_id
  keepalive: 40
  subscribeRoot: test/#

spring bean配置

@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    private String url;
    private String username;
    private String password;
    private String clientId;
    /** 单位:秒*/
    private Integer keepalive;
    private String subscribeRoot;

}

生产者配置

生产者java bean配置

@Configuration
public class MqttProducerConfig {
    @Autowired
    private MqttProperties properties;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{properties.getUrl()});
        options.setUserName(properties.getUsername());
        options.setPassword(properties.getPassword().toCharArray());
        options.setKeepAliveInterval(properties.getKeepalive());
        factory.setConnectionOptions(options)
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(properties.getClientId() + "-pub-" + System.currentTimeMillis(),
                        mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("test/control");
        messageHandler.setDefaultQos(0);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

生产者发送消息封装

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
@Component
public interface MqttProducer {
    /**
     * 发送消息,默认topic
     * @param payload 消息内容
     */
    void sendToMqtt(String payload);

    /**
     * 指定topic进行消息发送
     * @param topic 主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 指定消息qos
     * @param topic 主题
     * @param qos 消息质量
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

消费者配置

java bean配置

@Configuration
@IntegrationComponentScan
public class MqttConsumerConfig {
    private static final int QOS = 1;
    private final MqttProperties prop;
    private final MqttReceiver mqttInboundMessageHandler;

    public MqttConsumerConfig(MqttProperties prop, MqttReceiver mqttInboundMessageHandler) {
        this.prop = prop;
        this.mqttInboundMessageHandler = mqttInboundMessageHandler;
    }

    @Bean
    public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(),
                        mqttClientFactory,
                        prop.getSubscribeRoot());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(QOS);
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public MessageHandler InboundMessageHandler() {
        return mqttInboundMessageHandler;
    }

    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }
}

消费者接受消息封装

@Component
public class MqttReceiver implements MessageHandler {
    private static Logger logger = LoggerFactory.getLogger(MqttReceiver.class);

	// 线程池消费,避免阻塞broker。
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 30,
            10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(500),
            IotThreadFactory.create("consumeMsg", false),
            new DiscardOldestAndLogPolicy());

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
            logger.info("receive msg: {}={}" , message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());

        } catch (Exception e) {
            logger.error("consume message error:", e);
        }
    }
}

broker推荐

  1. 商用版本hiveMQ
  2. 社区开源emqx,强烈推荐。社区版本支持mqtt5.0完整协议,并支持集群部署,文档齐全,社区成熟。官网:emqx