参考文档:黑马程序员
Docs
概念知识
(1)什么是同步?什么是异步?
同步就是阻塞进程,每一步操作严格按顺序执行,获得结果后才返回;异步操作执行不严格按照顺序执行,操作间没有依赖关系
人话:比如开发一个项目,同步就是一个人开发完一个模块后再给另一个人开发,异步就是每个人都同时开发它各自负责的模块
(2)同步和异步各自的优缺点是什么?
同步优点是获得结果后才返回,缺点是扩展性差,模块多了容易出错,如果其中一换出错可能会影响整个流程
异步优点是支持并发操作,耦合度低,缺点是时效性差,有可能出现消息丢失或执行失败
(3)RabbitMQ干什么?
MQ(Message Queue)指消息队列
简单来说就是:原来在微服务的架构里,比如一个接口要调用多个服务,如果是同步操作那就要依次调用三个服务,性能差,而且一个服务出问题整个就gg了;现在用RabbitMQ,在接口必须的部分进行同步操作,完成后把消息存到消息队列,其他服务查看消息队列获取到相应的信息并执行
RabbitMQ结构

publisher:消息生产者
exchange:交换机,负责路由,把消息转发到对应的队列,不负责存储
queue:队列,接收交换机传来的信息并存储,等待消费者处理
consumer:消息消费者
VirtualHost:虚拟主机,你可以理解为一套RabbitMQ集群(包含多个交换机和队列),普通项目用一套就够了,大型项目可能会用多套
交换机类型
这里主要是要明白 Routing Key 和 Binding Key 的区别
Routing Key是路由key,主要是MQ发送消息时指定的
Binding Key是交换机和队列绑定的key,当消息传到交换机后,交换机获得消息的Routing Key,然后那它和与其连接的队列逐个比较,当消息的Routing Key和队列的Binding Key匹配它才会把消息转发给对应的交换机
(1)Fanout 交换机
类似广播,接收到publisher的消息后会把消息转发到所有队列
(2)Direct 交换机
精确匹配,必须当消息的Routing Key和队列的Binding Key完全相同时才会转发
(3)Topic 交换机
Binding Key 中可以使用特殊字符进行模糊匹配
*(匹配一个单词)
#(匹配零个或多个单词)
例如:
binding key 为 order.* 可以匹配 routing key 为 order.create、order.pay 等消息;
binding key 为 order.# 可以匹配 routing key 为 order.create.success 等消息。
Spring AMQP使用方法
SpringAMQP是Spring提供的一个简化MQ开发的模块,使用方法如下:
①添加依赖
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
②在application.yml中配置相关信息
spring:
rabbitmq:
host: 192.168.150.101 #你的IP
port: 5672 #端口
virtual-host: /hmall #虚拟主机名
username: hmall #用户名
password: 123 #密码
③生产者核心代码参考
try {
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getId());
} catch (Exception e) {
log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
}
使用rabbitTemplate.convertAndSend方法,第一个参数是交换机名称,第二个是路由的key
④消费者核心代码参考
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.topic"),
key = "pay.success"
))
public void listenPaySuccess(Long orderId){
orderService.markOrderPaySuccess(orderId);
}
在这里,@RabbitListener注解声明了这个方法为消息的监听者(也就是消费者)
bindings = @QueueBinding 用来定义交换机和队列之间的关系
value = @Queue(name = "trade.pay.success.queue", durable = "true")定义队列的信息
name指定队列的名称;durable = "true"表示这个队列是持久化的,不会因为重启而丢失
exchange = @Exchange(name = "pay.topic")定义交换机的信息
name指定交换机的名称
key用来设置路由的key,将该队列绑定到交换机,只接收key为pay.success的信息
当监听到对应的信息后,会触发 listenPaySuccess 方法,尝试从消息中获取orderId参数
MQ高级
这里主要是解决使用MQ过程中可能出现的各种问题(比如消息没发给MQ,MQ出错,消费者没从队列收到消息等)
(一)生产者可靠性(确保生产者的消息正确到达MQ)
保证生产者的信息一定能发送到MQ
通常采用生产者确认机制,只要信息能传到交换机,就返回正确的ack
因为在实际生产环境中使用较少,可以略过,要用的时候看看文档对应部分学习即可
(二)MQ可靠性(确保MQ不出问题)
第一个是持久化,包括交换机持久化(设置一下即可)、队列持久化(设置一下即可)、消息持久化(设置发送的message)
第二个是LazyQueue,惰性队列(即将接收到的信息直接存入磁盘,消费者读信息时才会把消息从磁盘加载到内存)
(三)消费者可靠性(确保消费者能正确收到信息)
①消费者确认机制
确认消费者成功收到消息,一般会用消费者确认机制,即消费者处理信息后返回一条信息告知MQ自己处理消息的状态,返回消息有三种:
ack:处理成功,MQ可以把队列中的这条信息删除了
nack:处理失败,MQ请再发一次这条信息
reject:处理失败且拒绝这条消息,MQ我删了你也不要再发了(很少用)
然后这种消息怎么发呢?由于代码固定MQ帮我们写了,我们只需要在配置文件里修改它的判断规律即可
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
none:不作处理,消费者只要接收到消息就返回ack,不安全别用
manual:手动,自己写代码返回,麻烦但灵活
auto:MQ自动帮你搞
②失败重试机制
如果消费者出现异常,没有正确处理队列中的信息,那么消息会重新入队,如果消费者一直出现异常,就会出现消息无限循环入队的情况,我们需要解决这个问题,也就是用消费者失败重试机制
在application.yml配置即可
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
比如在这里,当一条消息等待1s没有返回ack,就说明没有被消费者正确处理,那就会再等待1s重试,尝试3次后判断失败,会返回一条reject消息
③失败处理策略
有时候不想只返回一条reject,那我们可以自定义一个交换机和队列专门用来收集错误信息
参考day7 3.3即可
④消息幂等性
就是要求多次发送信息不会出bug,出现错误,此时可以通过给消息赋上唯一id的方法来判断
或者修改相应的业务逻辑即可
(四)延迟信息
我们可以声明一个延迟交换机,让信息延时一段时间再进入交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
定义这个交换机和上面定义普通交换机是类似的
然后在需要发送延时信息的地方
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
补充:我们通常可以把使用MQ的方法集中抽取到一个common模块下面,那么其他模块只需要在它的pom.xml导入即可使用,使用MQ的核心就是Spring AMQP部分那几段代码,其他大部分都是根据这些代码进行的拓展延申