极简RabbitMQ快速学习

发布于:2025-02-28 ⋅ 阅读:(12) ⋅ 点赞:(0)

参考文档:黑马程序员
Docs

概念知识

(1)什么是同步?什么是异步?

同步就是阻塞进程,每一步操作严格按顺序执行,获得结果后才返回;异步操作执行不严格按照顺序执行,操作间没有依赖关系

人话:比如开发一个项目,同步就是一个人开发完一个模块后再给另一个人开发,异步就是每个人都同时开发它各自负责的模块

(2)同步和异步各自的优缺点是什么?

同步优点是获得结果后才返回,缺点是扩展性差,模块多了容易出错,如果其中一换出错可能会影响整个流程

异步优点是支持并发操作,耦合度低,缺点是时效性差,有可能出现消息丢失或执行失败

(3)RabbitMQ干什么?

MQ(Message Queue)指消息队列

简单来说就是:原来在微服务的架构里,比如一个接口要调用多个服务,如果是同步操作那就要依次调用三个服务,性能差,而且一个服务出问题整个就gg了;现在用RabbitMQ,在接口必须的部分进行同步操作,完成后把消息存到消息队列,其他服务查看消息队列获取到相应的信息并执行

RabbitMQ结构

publisher:消息生产者
exchange:交换机,负责路由,把消息转发到对应的队列,不负责存储
queue:队列,接收交换机传来的信息并存储,等待消费者处理
consumer:消息消费者

VirtualHost:虚拟主机,你可以理解为一套RabbitMQ集群(包含多个交换机和队列),普通项目用一套就够了,大型项目可能会用多套

交换机类型

这里主要是要明白 Routing Key 和 Binding Key 的区别
Routing Key是路由key,主要是MQ发送消息时指定的
Binding Key是交换机和队列绑定的key,当消息传到交换机后,交换机获得消息的Routing Key,然后那它和与其连接的队列逐个比较,当消息的Routing Key和队列的Binding Key匹配它才会把消息转发给对应的交换机

(1)Fanout 交换机

类似广播,接收到publisher的消息后会把消息转发到所有队列

(2)Direct 交换机

精确匹配,必须当消息的Routing Key和队列的Binding Key完全相同时才会转发

(3)Topic 交换机

Binding Key 中可以使用特殊字符进行模糊匹配
*(匹配一个单词)
#(匹配零个或多个单词)
例如:
binding key 为 order.* 可以匹配 routing key 为 order.create、order.pay 等消息;
binding key 为 order.# 可以匹配 routing key 为 order.create.success 等消息。

Spring AMQP使用方法

SpringAMQP是Spring提供的一个简化MQ开发的模块,使用方法如下:

①添加依赖

  <!--消息发送-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

②在application.yml中配置相关信息

spring:
  rabbitmq:
    host: 192.168.150.101    #你的IP
    port: 5672   #端口
    virtual-host: /hmall    #虚拟主机名
    username: hmall     #用户名
    password: 123    #密码

③生产者核心代码参考


try {
     rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getId());
    } catch (Exception e) {
     log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);
    }

 使用rabbitTemplate.convertAndSend方法,第一个参数是交换机名称,第二个是路由的key

④消费者核心代码参考

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic"),
            key = "pay.success"
    ))
public void listenPaySuccess(Long orderId){
    orderService.markOrderPaySuccess(orderId);
}

在这里,@RabbitListener注解声明了这个方法为消息的监听者(也就是消费者)
bindings = @QueueBinding 用来定义交换机和队列之间的关系

value = @Queue(name = "trade.pay.success.queue", durable = "true")定义队列的信息
name指定队列的名称;durable = "true"表示这个队列是持久化的,不会因为重启而丢失

exchange = @Exchange(name = "pay.topic")定义交换机的信息
name指定交换机的名称

key用来设置路由的key,将该队列绑定到交换机,只接收key为pay.success的信息

当监听到对应的信息后,会触发 listenPaySuccess 方法,尝试从消息中获取orderId参数
 

MQ高级

这里主要是解决使用MQ过程中可能出现的各种问题(比如消息没发给MQ,MQ出错,消费者没从队列收到消息等)

(一)生产者可靠性(确保生产者的消息正确到达MQ)

保证生产者的信息一定能发送到MQ
通常采用生产者确认机制,只要信息能传到交换机,就返回正确的ack
因为在实际生产环境中使用较少,可以略过,要用的时候看看文档对应部分学习即可

(二)MQ可靠性(确保MQ不出问题)

第一个是持久化,包括交换机持久化(设置一下即可)、队列持久化(设置一下即可)、消息持久化(设置发送的message)
第二个是LazyQueue,惰性队列(即将接收到的信息直接存入磁盘,消费者读信息时才会把消息从磁盘加载到内存)

(三)消费者可靠性(确保消费者能正确收到信息)

①消费者确认机制

确认消费者成功收到消息,一般会用消费者确认机制,即消费者处理信息后返回一条信息告知MQ自己处理消息的状态,返回消息有三种:

ack:处理成功,MQ可以把队列中的这条信息删除了
nack:处理失败,MQ请再发一次这条信息
reject:处理失败且拒绝这条消息,MQ我删了你也不要再发了(很少用)

然后这种消息怎么发呢?由于代码固定MQ帮我们写了,我们只需要在配置文件里修改它的判断规律即可

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

none:不作处理,消费者只要接收到消息就返回ack,不安全别用
manual:手动,自己写代码返回,麻烦但灵活
auto:MQ自动帮你搞

②失败重试机制

如果消费者出现异常,没有正确处理队列中的信息,那么消息会重新入队,如果消费者一直出现异常,就会出现消息无限循环入队的情况,我们需要解决这个问题,也就是用消费者失败重试机制

在application.yml配置即可

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

 比如在这里,当一条消息等待1s没有返回ack,就说明没有被消费者正确处理,那就会再等待1s重试,尝试3次后判断失败,会返回一条reject消息

③失败处理策略

有时候不想只返回一条reject,那我们可以自定义一个交换机和队列专门用来收集错误信息
参考day7 3.3即可

④消息幂等性

就是要求多次发送信息不会出bug,出现错误,此时可以通过给消息赋上唯一id的方法来判断
或者修改相应的业务逻辑即可

(四)延迟信息

我们可以声明一个延迟交换机,让信息延时一段时间再进入交换机

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

 定义这个交换机和上面定义普通交换机是类似的

然后在需要发送延时信息的地方

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

补充:我们通常可以把使用MQ的方法集中抽取到一个common模块下面,那么其他模块只需要在它的pom.xml导入即可使用,使用MQ的核心就是Spring AMQP部分那几段代码,其他大部分都是根据这些代码进行的拓展延申