文章目录
Spring集成RabbitMQ
1. AMQP&SpringAMQP
- AMQP(高级消息队列协议):Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。是一种面向消息通信的协议,就像HTTP协议是一种浏览器向服务器发消息的协议。
- SpringAMQP:Spring AMOP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。也就是说SpringAMQP只是一种思想,而spring-rabbit是其具体实现
2. SpringBoot集成RabbitMQ
在Maven依赖中引入amqp的起步依赖即可
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在Spring配置文件中配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
# 虚拟主机
virtual-host: /hhy
username: hhy
password: hhy
RabbitTemplate
是Spring封装好的操作RabbitMQ的工具类
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue() {
String queueName = "hhy.q1";
String msg = "hello, mq!666";
rabbitTemplate.convertAndSend(queueName, msg);
}
消费者
@Component
public class MqListener {
@RabbitListener(queues = "hhy.q1")
public void listenSimpleQueue(String msg){
System.out.println("hhy.q1的消息:【" + msg +"】");
}
}
3. 模型
work模型
假设消息生产者生产消息的速度非常的快,消息消费者消费消息的速度赶不上生产的速度,就会导致MQ队列中的消息越来越多,从而导致消息堆积问题,如何处理消息堆积问题?
- 让多个消费者绑定一个队列,加快消息处理速度
- 还可以在代码层面使用异步操作,比说线程池
绑定多个消费者,每个消费者的处理能力也可能不一致,而Spring默认将消息以轮询的方式发送给多个消费者,处理能力慢的消费者还是会影响处理速度,此时就可以通过添加配置prefetch
让消费者只获取一条消息处理完成后再获取,进一步避免消息堆积问题
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
work模型就是多个消费者绑定一个队列
@Component
public class MqListener {
@RabbitListener(queues = "work.q")
public void workListen1(String msg){
System.out.println("消费者1:work.q的消息:【" + msg +"】");
}
@RabbitListener(queues = "work.q")
public void workListen2(String msg){
System.err.println("消费者2:work.q的消息:【" + msg +"】");
}
}
4.交换机
上诉实例代码中并没有使用交换机,生产者是直接将消息发送到队列中,实际这种方式是不合理的,假设多个服务都需要订阅同一条消息这种方式就无法满足需求了,那么就要引入交换机。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
Fanout交换机其实就是广播,将生产者发布的消息广播给绑定的自身的所有消息队列。发送消息流程:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
根据上诉图编写代码
// 消费者1消费队列1
@RabbitListener(queues = "fanout.q1")
public void fanoutListen1(String msg){
System.out.println("消费者1:fanout.q1的消息:【" + msg +"】");
}
// 消费者2消费队列2
@RabbitListener(queues = "fanout.q2")
public void fanoutListen2(String msg){
System.out.println("消费者1:fanout.q2的消息:【" + msg +"】");
}
生产者向Fanout类型交换机发送消息,前提需要创建Fanout类型的交换机
@Test
void testSendFanout() {
// 交换机名称
String exchangeName = "amq.fanout";
String msg = "hello, fanout!";
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
通过key进行绑定,如下图也就是说生产者发送消息时指定key为test
两个消费者内的队列都能收到,key为java
时只有dirct.q1
队列能收到,key为cpp
时只有dirct.q2
队列能收到
消费者代码
@RabbitListener(queues = "direct.q1")
public void fanoutDirect1(String msg){
System.out.println("消费者1:direct.q1的消息:【" + msg +"】");
}
@RabbitListener(queues = "direct.q2")
public void fanoutDirect2(String msg){
System.out.println("消费者2:direct.q2的消息:【" + msg +"】");
}
生产者代码
生产者在指定消息时指定不同的key来发送消息
@Test
void testSendDirect() {
String exchangeName = "hhy.direct";
String msg = "所有队列都能收到该消息";
rabbitTemplate.convertAndSend(exchangeName, "test", msg);
}
@Test
void testSendDirect() {
String exchangeName = "hhy.direct";
String msg = "只有队列direct.q1能收到消息";
rabbitTemplate.convertAndSend(exchangeName, "java", msg);
}
@Test
void testSendDirect() {
String exchangeName = "hhy.direct";
String msg = "只有队列direct.q2能收到消息";
rabbitTemplate.convertAndSend(exchangeName, "cpp", msg);
}
Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!也就是说Topic
交换机是非常灵活的,Bindingkey
支持模糊匹配。
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如: china.hunan
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
假设有多个队列绑定的Bindingkey分别为:
china.hunan.chenzhou.weather
:湖南郴州的天气china.hunan.chenzhou.news
:湖南郴州的新闻china.zhejiang.hangzhou.weather
:浙江杭州的天气japan.tokyo.news
:日本东京的新闻
那么使用通配符:
china.hunan.#
:表示接受湖南的所有新闻和天气消息#.news
:表示接受所有新闻消息china.hunan.*.news
:表示接受湖南省各个市区的新闻
建立绑定关系:
代码实例
// 消费者
@RabbitListener(queues = "topic.q1")
public void topicListen1(String msg){
System.out.println("消费者1:topic.q1的消息:【" + msg +"】");
}
@RabbitListener(queues = "topic.q2")
public void topicListen2(String msg){
System.out.println("消费者2:topic.q2的消息:【" + msg +"】");
}
生产者代码
这一条消息topic.q1
和topic.q2
两个队列都能收到消息,因为它们和交换机绑定的关系的时候指定的KEY:
#.news
:接受所有地方的新闻china.hunan.#
:接受湖南的新闻和天气
@Test
void testSendTopic() {
// 交换机名称
String exchangeName = "hhy.topic";
String key = "china.hunan.chenzhou.news";
String msg = "这是一条湖南郴州的新闻!";
rabbitTemplate.convertAndSend(exchangeName, key, msg);
}
下面这条消息只有topic.q2
能收到,因为topic.q2
和交换机绑定时指定的KEY为china.hunan.#
,接受湖南的所有天气和新闻消息
@Test
void testSendTopic() {
// 交换机名称
String exchangeName = "hhy.topic";
String key = "china.hunan.chenzhou.weather";
String msg = "郴州今天多云转晴";
rabbitTemplate.convertAndSend(exchangeName, key, msg);
}
小结:
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
5.声明式队列和交换机
通过RabbitMQ提供的管理页面创建队列和交换机比较麻烦,SpringAMQP提供了对应API方便开发者来创建队列和交换机。
基于API声明
通过Spring提供的API创建fanout交换机和队列并建立绑定关系
@Configuration
public class FanoutConfiguration {
/**
* 声明式创建fanout交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hhy.fanout");
}
/**
* 声明式创建队列
* @return
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 声明式创建绑定关系
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding fanoutBinding3(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
}
但如果使用这种方式创建Direct交换机就会非常麻烦,因为如果要绑定时要指定多个Key就会出现很多冗余代码,每绑定一个不同的Key就需要多写一份代码
@Configuration
public class DirectConfiguration {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("test.direct");
}
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
@Bean
public Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
@Bean
public Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
@Bean
public Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
@Bean
public Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
基于注解声明
基于@Bean的方式声明队列和交换机的方式比价麻烦,代码有点冗余,Spring还为我们提供基于注解的方式来声明。
使用注解的方式声明Direct
模式的交换机和队列,通过注解声明这种创建方式更简单清爽,一个注解直接创建交换机并且绑定队列。并且对应消费者直接就可以监听队列接收消息
@Component
public class MqListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenSimpleQueue1(String msg){
System.out.println("消费者1:收到了simple.queue的消息:【" + msg +"】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenSimpleQueue2(String msg){
System.out.println("消费者2:收到了simple.queue的消息:【" + msg +"】");
}
}
6.消息转换器
前面我们生产者发送的消息都是一些字符串,当我们发送的消息是一个对象的时候就会出现问题。
@Test
void testSendObject() {
String exchangeName = "test.direct";
Map<String, Object> msg = new HashMap<>(2);
msg.put("name", "jack");
msg.put("age", 21);
rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}
如下图RabbitMQ中的消息队列中存储的消息,数据类型是通过JDK自带的序列化后的数据
而JDK自带的序列化,存在以下问题:
- 消息体积大
- 毫无可读性
- 有安全漏洞,利用Java字节码反序列化能被替换恶意代码
所以使用JDK自带的序列化方式并不合适,那么我可以使用JSON的序列化方式来解决这个问题。
使用jackson就行,引入jackson依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
将消息转换器交给Spring管理
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}