消息中间件 ---------RabbitMQ(三)

发布于:2023-01-04 ⋅ 阅读:(415) ⋅ 点赞:(0)

消息中间件 ---------RabbitMQ(一)
消息中间件 ---------RabbitMQ(二)

5.持久化

如何避免消息丢失?

1) 消费者的手动ACK机制。可以防止业务处理失败。

2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。

是可以将消息进行持久化呢?

要将消息持久化,前提是:队列、Exchange都持久化

5.1.交换机持久化

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U6elRh44-1661911438571)(assets/1532766951432.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rEvOWwBq-1661911438572)(assets/1588122843487.png)]

5.2.队列持久化

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UjE0eqWp-1661911438572)(assets/1532766981230.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NuXjhXKY-1661911438573)(assets/1588122996333.png)]

5.3.消息持久化

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aL3oEiAT-1661911438574)(assets/1532767057491.png)]

5.4.测试

​ 分别测试持久化和非持久化:

​ 1、Send给Recv发送50条消息

​ 2、Recv收到一条消息sleep1秒钟,收到前几条消息后立即关闭

​ 3、重启RabbitMQ观察消息是否丢失

6.Spring AMQP

6.1.简介

Spring有很多不同的项目,其中就有对AMQP的支持:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cSmIm0Ei-1661911438574)(assets/1532767136007.png)]

Spring AMQP的页面:http://spring.io/projects/spring-amqp

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nwAw9tw9-1661911438575)(assets/1532767171063.png)]

注意这里一段描述:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1KAowHFW-1661911438576)(assets/1532767227821.png)]

     Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

6.2.依赖和配置

添加AMQP的启动器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties中添加RabbitMQ地址:

#主机
spring.rabbitmq.host=192.168.233.132
#端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=1111
#虚拟分组
spring.rabbitmq.virtual-host=/

6.3.监听者

在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

@Component
public class Listener {

    /**
     * 监听者接收消息三要素:
     *  1、queue
     *  2、exchange
     *  3、routing key
     */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value="springboot_queue",durable = "true"),
        exchange = @Exchange(value="springboot_exchage",type= ExchangeTypes.TOPIC),
        key= {"*.*"}
    ))
    public void listen(String msg){
        System.out.println("接收到消息:" + msg);
    }
}
  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
      • value:这个消费者关联的队列。值是@Queue,代表一个队列
      • exchange:队列所绑定的交换机,值是@Exchange类型
      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

6.4.AmqpTemplate

Spring最擅长的事情就是封装,把他人的框架进行封装和整合。

Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NWX76TsT-1661911438576)(assets/1527090258083.png)]

红框圈起来的是比较常用的3个方法,分别是:

  • 指定交换机、RoutingKey和消息体
  • 指定消息
  • 指定RoutingKey和消息,会向默认的交换机发送消息

6.5.发送者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class MqDemo {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException {
        String msg = "hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
        // 等待10秒后再结束
        Thread.sleep(10000);
    }
}

运行后查看日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwW1goMF-1661911438577)(assets/1532767726274.png)]

6.6.手动ack

6.6.1.application.properties

#设置三种订阅模式手动ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#设置work消息类型手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

6.6.2.监听者

注意:监听方法添加Channel channel, Message message两个参数

package com.zzcsy.springboot;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class Recv {
    /**
     * 监听者接收消息三要素:
     *  1、queue
     *  2、exchange
     *  3、routing key
     */
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value="springboot_queue",durable = "true"),
        exchange = @Exchange(value="springboot_exchage",type= ExchangeTypes.TOPIC),
        key= {"*.*"}
    ))
    public void listen(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息:" + msg);
        //int a = 6/0;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到