消息中间件 ---------RabbitMQ(一)
消息中间件 ---------RabbitMQ(二)
RabbitMQ(三)
5.持久化
如何避免消息丢失?
1) 消费者的手动ACK机制。可以防止业务处理失败。
2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。
是可以将消息进行持久化呢?
要将消息持久化,前提是:队列、Exchange都持久化
5.1.交换机持久化
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U6elRh44-1661911438571)(assets/1532766951432.png)]](https://img-blog.csdnimg.cn/6a39c527eed54800966f2af9bfc8f475.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rEvOWwBq-1661911438572)(assets/1588122843487.png)]](https://img-blog.csdnimg.cn/3c6555b8ec6745399523fa0e09c34324.png)
5.2.队列持久化
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UjE0eqWp-1661911438572)(assets/1532766981230.png)]](https://img-blog.csdnimg.cn/ba7ee0905fd04417973c4801e5e27a0a.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NuXjhXKY-1661911438573)(assets/1588122996333.png)]](https://img-blog.csdnimg.cn/60e57f4b3504492c823a115aa1f5f89a.png)
5.3.消息持久化
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aL3oEiAT-1661911438574)(assets/1532767057491.png)]](https://img-blog.csdnimg.cn/d90edb52837b40b2a5eea52e4a5e8766.png)
5.4.测试
分别测试持久化和非持久化:
1、Send给Recv发送50条消息
2、Recv收到一条消息sleep1秒钟,收到前几条消息后立即关闭
3、重启RabbitMQ观察消息是否丢失
6.Spring AMQP
6.1.简介
Spring有很多不同的项目,其中就有对AMQP的支持:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cSmIm0Ei-1661911438574)(assets/1532767136007.png)]](https://img-blog.csdnimg.cn/48638d2811ad4be99b13b3a3cbc41c85.png)
Spring AMQP的页面:http://spring.io/projects/spring-amqp
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nwAw9tw9-1661911438575)(assets/1532767171063.png)]](https://img-blog.csdnimg.cn/9acd306f8f944f6eb3c09d1e6dacbc8a.png)
注意这里一段描述:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1KAowHFW-1661911438576)(assets/1532767227821.png)]](https://img-blog.csdnimg.cn/830b8b328b674e3d83cb29176bac49f4.png)
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
6.2.依赖和配置
添加AMQP的启动器:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.properties中添加RabbitMQ地址:
#主机
spring.rabbitmq.host=192.168.233.132
#端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=1111
#虚拟分组
spring.rabbitmq.virtual-host=/
6.3.监听者
在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。
@Component
public class Listener {
/**
* 监听者接收消息三要素:
* 1、queue
* 2、exchange
* 3、routing key
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value="springboot_queue",durable = "true"),
exchange = @Exchange(value="springboot_exchage",type= ExchangeTypes.TOPIC),
key= {"*.*"}
))
public void listen(String msg){
System.out.println("接收到消息:" + msg);
}
}
@Componet:类上的注解,注册到Spring容器@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:value:这个消费者关联的队列。值是@Queue,代表一个队列exchange:队列所绑定的交换机,值是@Exchange类型key:队列和交换机绑定的RoutingKey
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
6.4.AmqpTemplate
Spring最擅长的事情就是封装,把他人的框架进行封装和整合。
Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NWX76TsT-1661911438576)(assets/1527090258083.png)]](https://img-blog.csdnimg.cn/9784ff78758f4a53a296bc7d8d437617.png)
红框圈起来的是比较常用的3个方法,分别是:
- 指定交换机、RoutingKey和消息体
- 指定消息
- 指定RoutingKey和消息,会向默认的交换机发送消息
6.5.发送者
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class MqDemo {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "hello, Spring boot amqp";
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
// 等待10秒后再结束
Thread.sleep(10000);
}
}
运行后查看日志:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwW1goMF-1661911438577)(assets/1532767726274.png)]](https://img-blog.csdnimg.cn/42042d8b3c734c6980db1a701d7e4efd.png)
6.6.手动ack
6.6.1.application.properties
#设置三种订阅模式手动ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#设置work消息类型手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
6.6.2.监听者
注意:监听方法添加Channel channel, Message message两个参数
package com.zzcsy.springboot;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class Recv {
/**
* 监听者接收消息三要素:
* 1、queue
* 2、exchange
* 3、routing key
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value="springboot_queue",durable = "true"),
exchange = @Exchange(value="springboot_exchage",type= ExchangeTypes.TOPIC),
key= {"*.*"}
))
public void listen(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息:" + msg);
//int a = 6/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}