这两天有需求需要使用rabbitMQ,正好有时间整理一下rabbitmq的使用方法
第一步, 在pom.xml中引入,这里就不写了.
第二步, 在nacos中添加rabbitmq的配置文件
ryhj:
mq-queue-name: myqueueName
mq-exchange-name: myExchangeName
mq-routing-key: myKey
spring:
rabbitmq:
host: 192.168.1.110
password: mypassword
port: 5672
username: yourUsername
上方的名称可以根据需要自己更改
第三步:
生产者的方法。 我们这次使用伊特TestControlller来进行测试
mport cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.apex.iot.data.model.DataModel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
@RestController
@RequestMapping("/test")
public class TestController {
@Resource
private RabbitTemplate rabbitTemplate;
// 此处是在nacos中获取的交换机名字
@Value("${ryhj.mq-exchange-name}")
private String mqExchangeName;
// 此处是在nacos中获取的路由键
@Value("${ryhj.mq-routing-key}")
private String mqRoutingKey;
@RequestMapping("/test")
public void test(){
System.err.println(1);
DataModel data=new DataModel();
data.setDeviceParentName("吸附机器");
data.setDeviceId("XL_1");
data.setDeviceName("吸附");
data.setIp("127.0.0.1");
data.setValue(true);
data.setValueType("a");
data.setCreateTime(new Date());
data.setStatus(0);
JSON parse = JSONUtil.parse(data);
// 这一步是将数据转变为json的关键,要不然接收后会乱码
String result = com.alibaba.fastjson.JSON.toJSONString(data);
rabbitTemplate.convertAndSend(mqExchangeName, mqRoutingKey, result);
}
}
第四步,消费者写的方法:
package com.apex.iot.rabbitmq;
import com.apex.iot.device.service.DeviceService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* rabbitMq的mq监听
* @createDate 2025-03-13
*/
@Component
@Configuration
@Slf4j
public class RabbitMqListener {
/**引入部分分别是队列名,交换机以及路由键
**/
@RabbitHandler
@RabbitListener(bindings = {
@QueueBinding(value = @Queue("#{'${ryhj.mq-queue-name}'}"),
exchange = @Exchange(value = "#{'${ryhj.mq-exchange-name}'}",type = "direct"),
key = "#{'${ryhj.mq-routing-key}'}")
})
public void process(String msg, Message message, Channel channel) throws IOException {
try {
//数据处理
System.err.println("msg:"+msg);
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e) {
try {
// 拒绝消息并重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
log.error("处理mq设备数据失败!数据:{}", msg, e);
}catch (Exception ee) {
log.error("处理mq设备信息失败!数据:{}", msg, ee);
}
}
}
}
这样写完以后,mq处理基本已经完成了,但是我接收的时候会报错。报错内容如下:
2025-03-18 10:09:22.038 WARN 31944 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
这个错误信息表明,Spring AMQP 在处理 RabbitMQ 消息时失败了,具体原因是消息转换失败。
所以我们可以在消息监听者这边添加一个mq的配置类,具体如下:
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory, MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory); // 设置连接工厂
factory.setMessageConverter(messageConverter); // 设置消息转换器
return factory;
}
}
这样,再次启动,就完成啦。
如果接收有乱码信息出现,如下图:
sr cn.hutool.json.JSONObject j Na
v L configt Lcn/hutool/json/JSONConfig;xr cn.hutool.core.map.MapWrapper T#A_ r L rawt Ljava/util/Map;xpsr java.util.LinkedHashMap4 N\l Z accessOrderxr java.util.HashMap ` F
loadFactorI thresholdxp?@ w t deviceIdt QY_XFXL_1t
deviceNamet
吸附系列1t deviceParentNamet 吸附机器t ipt 127.0.0.1t valuesr java.lang.Boolean r ՜ Z valuexpt valueTypet at
createTimesr java.util.Datehj KYt xpw T
xt statussr java.lang.Integer⠤ 8 I valuexr java.lang.Number xp x q ~ xsr cn.hutool.json.JSONConfig ^ L Z checkDuplicateZ
ignoreCaseZ ignoreErrorZ ignoreNullValueZ stripTrailingZerosZ transientSupportL
dateFormatt Ljava/lang/String;L
keyComparatort Ljava/util/Comparator;xp pp
这样,我们在生产者上编辑一下json就可以了。
上方已经备注了。就这一行
String result = com.alibaba.fastjson.JSON.toJSONString(data)