mqtt简介
随着物联网的发展,越来越多的开发同学接开始触物联网平台的开发,其中通信协议是物联网类项目首先要面临的一个选择,MQTT就是一个应用非常广泛的物联网协议。我们可以看MQTT官网的介绍:
MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.
Java sdk选择
作为Java开发者,比较常用的两个mqtt sdk是eclipse的paho和fusesource的mqtt-client。但是直接使用这两个库的话,开发者需要做很多自定义的工作,比如重连处理、异常处理等细节,稍有不慎就会导致连接异常,重连失败等严重的线上问题。这里比较推荐大家使用spring-integration-mqtt,属于spring大家族的子项目,开箱即用,只需要做简单的配置,就可以实现消息收发功能。
引入依赖包
这里有一点需要注意,paho 1.2.1之前版本有严重缺陷,一定要升级到较新版本
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<exclusions>
<exclusion>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
yml配置
mqtt:
url: tcp://127.0.0.1:1883
username: your_username
password: your_pwd
clientId: your_client_id
keepalive: 40
subscribeRoot: test/#
spring bean配置
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
private String url;
private String username;
private String password;
private String clientId;
/** 单位:秒*/
private Integer keepalive;
private String subscribeRoot;
}
生产者配置
生产者java bean配置
@Configuration
public class MqttProducerConfig {
@Autowired
private MqttProperties properties;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{properties.getUrl()});
options.setUserName(properties.getUsername());
options.setPassword(properties.getPassword().toCharArray());
options.setKeepAliveInterval(properties.getKeepalive());
factory.setConnectionOptions(options)
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(properties.getClientId() + "-pub-" + System.currentTimeMillis(),
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test/control");
messageHandler.setDefaultQos(0);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
生产者发送消息封装
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
@Component
public interface MqttProducer {
/**
* 发送消息,默认topic
* @param payload 消息内容
*/
void sendToMqtt(String payload);
/**
* 指定topic进行消息发送
* @param topic 主题
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
/**
* 指定消息qos
* @param topic 主题
* @param qos 消息质量
* @param payload 消息内容
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
消费者配置
java bean配置
@Configuration
@IntegrationComponentScan
public class MqttConsumerConfig {
private static final int QOS = 1;
private final MqttProperties prop;
private final MqttReceiver mqttInboundMessageHandler;
public MqttConsumerConfig(MqttProperties prop, MqttReceiver mqttInboundMessageHandler) {
this.prop = prop;
this.mqttInboundMessageHandler = mqttInboundMessageHandler;
}
@Bean
public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(),
mqttClientFactory,
prop.getSubscribeRoot());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(QOS);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler InboundMessageHandler() {
return mqttInboundMessageHandler;
}
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
}
消费者接受消息封装
@Component
public class MqttReceiver implements MessageHandler {
private static Logger logger = LoggerFactory.getLogger(MqttReceiver.class);
// 线程池消费,避免阻塞broker。
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 30,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
IotThreadFactory.create("consumeMsg", false),
new DiscardOldestAndLogPolicy());
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
logger.info("receive msg: {}={}" , message.getHeaders().get("mqtt_receivedTopic"), message.getPayload());
} catch (Exception e) {
logger.error("consume message error:", e);
}
}
}
broker推荐
- 商用版本hiveMQ
- 社区开源emqx,强烈推荐。社区版本支持mqtt5.0完整协议,并支持集群部署,文档齐全,社区成熟。官网:emqx