RabbitMinQ(模拟实现消息队列项目)

发布于:2025-09-02 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

一.消息队列背景

二.需求分析

核心概念:

BrokerServer:

BrokerServer的核心API:

交换机Exchange:

持久化:

网络通信:

消息应答:

三、模块划分

四、创建项目

五、创建核心类

Exchange:

MSGQueue:

Binding:

Message:

六.数据库设计

七.实现DataBaseManager类:

DataBaseManager类:

编写DataBaseManager类的测试用例:

八.消息存储设置

九.创建MessageFileManager类

创建MessageFileManager类:

实现该类的功能:

1.创建数据文件:统计文件的目录,文件

2.实现读写统计文件功能:

3.实现读写数据文件功能:

4.实现读写消息方法:

5.加载文件中的所有有效消息;

6.实现垃圾清除功能:GC:

编写MessageFileManager测试用例:

十.整合数据库和文件数据


一.消息队列背景

我们学习过阻塞队列(BlockingQueue), 阻塞队列最⼤的⽤途,就是⽤来实现 ⽣产者消费 者模型. ⽣产者消费者模型,存在诸多好处,是后端开发的常⽤编程⽅式.

• 解耦合

• 削峰填⾕

在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求.因此,我们通常会把阻塞队列,封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能.这样的程序我们就称为消息队列(Message Queue,MQ).

市面上成熟的消息队列有很多:kafka,RabbitMQ,RocketMQ.....

接下来我们就以RabbitMQ为模版,实现他的核心功能.

二.需求分析

核心概念:

生产者Producter,消费者Consumer,中间人Broker,发布publisher,订阅Subscribe.

⼀个⽣产者,⼀个消费者

多个⽣产者,多个消费者

生产者和消费者存在1对1和多对多等多种关系模式.

生产者发布消息,到达Broker,Broker将消息发送给订阅该消息的消费者.Broker是消息队列的核心,负责存储和转发消息.

BrokerServer:

在broker内部,又存在一下概念:

虚拟机VirtualHost,交换机Exchange,队列Queue,绑定关系Binding

虚拟机类似于数据库的database,起到数据分割和管理的作用,一个BrokerServer内部可以有多个虚拟机.

交换机Exchange:生产者将消息发送到Broker的交换机上,交换机根据不同的路由规则,将消息分发到不同的队列上。

队列Queue:接收并存储交换机分发的消息,根据消费者的订阅队列的情况,将消息推送给消息者。(消费者自己决定从哪个队列上读取消息)

绑定关系Binding:这个绑定关系,指的是交换机和队列的绑定关系。(交换机和队列存在多对多的绑定关系)

消息Message:真正存储的数据。

BrokerServer的核心API:

对于BrokerServer来说,要实现以下核心API,通过这些API来实现消息队列的基本功能:

1. 创建队列 (queueDeclare)

2. 销毁队列 (queueDelete)

3. 创建交换机 (exchangeDeclare)

4. 销毁交换机 (exchangeDelete)

5. 创建绑定 (queueBind)

6. 解除绑定 (queueUnbind)

7. 发布消息 (basicPublish)

8. 订阅消息 (basicConsume)

9. 确认消息 (basicAck)

生产者和消费者通过远程调用这些API,实现生产者 消费者模型。

交换机Exchange:

对于RabbitMQ来说,支持4种交换机类型:

直接交换机Direct,扇出交换机 Fanout ,主题交换机Topic ,首部交换机Header

Header交换机比较复杂且实用场景较少,目前我们仅实现前3种。

直接交换机Direct:⽣产者发送消息时,直接指定被该交换机绑定的队列名。(发放专属红包)

扇出交换机 Fanout :⽣产者发送的消息会被复制到该交换机的所有队列中. (发一个红包,所有人都能领,且领到的是该红包的所有金额)

主题交换机Topic:绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列。(发一个口令红包,回答出对应的口令,才可以领红包,且领到的是该红包的所有金额)

持久化:

要将交换机,队列,消息都设置持久化功能,防止当BrokerServer出现异常宕机时,重启后,存储在上面的数据丢失。(将其都存储在磁盘和内存上各一份)

网络通信:

生产者和消费者都是客户端,BrokerServer作为服务端,通过网络进行通信,客户端要提供对应的API,来实现对服务器的调用。

1. 创建 Connection

2. 关闭 Connection

3. 创建 Channel

4. 关闭 Channel

5. 创建队列 (queueDeclare)

6. 销毁队列 (queueDelete)

7. 创建交换机 (exchangeDeclare)

8. 销毁交换机 (exchangeDelete)

9. 创建绑定 (queueBind)

10. 解除绑定 (queueUnbind)

11. 发布消息 (basicPublish)

12. 订阅消息 (basicConsume)

13. 确认消息 (basicAck)

在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.

Connection 对应⼀个 TCP 连接.

Channel 则是 Connection 中的逻辑通道.⼀个 Connection 中可以包含多个 Channel.

Channel 和 Channel 之间的数据是独⽴. 不会相互⼲扰。这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接。

消息应答:

被消费的消息,要进行消息应答,当消费者的应答方式为自动时,只要消息从队列中分发出去,就会将其从队列中删除;若为手动应答模式,需要消费者自己调用确认接收消息并返回确认响应,才会将消息从队列中删除。

三、模块划分

四、创建项目

创建springboot项目,引入springWeb和MyBatis,LomBok类。

五、创建核心类

Exchange,MSGQueue,Binding,Message:

Exchange:

/**
 * 创建交换机:
 */
@Data
public class Exchange {
    //交换机的唯一标识
    private String name;
    //交换机类型:默认为直接交换件
    private ExchangeType type = ExchangeType.DIRECT;
    //是否持久化 ,默认持久化
    private boolean durable = true;
    //在交换机不再使用的时候,是否自动删除 ,默认不自动删除
    private boolean autoDelete = false;
    //选填参数
    private Map<String,Object> args;
}

关于args属性, 由于数据库没有对应的Map类型,在写入数据库时,要将其转换成String类型,从数据库中读取数据时,再将其转换成Map类型,因此,要重写args的set和get方法:

    //这里的args的数据类型是Map,数据库中没有对应的数据类型,
    // 存入数据库时,要转换成String类型,
    //从数据库中读取时,要转换成Map类型
    public String getArgs(){
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsString(args);
        } catch (JsonProcessingException e) {
//            throw new RuntimeException(e);
            e.printStackTrace();
        }
        return "{}";
    }
    public void setArgs(String args) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        //              将字符串转换为HashMap类型,若转换为基本数据类型时直接填就行,
        //              这里转换的是HashMap集合类型,比较复杂,需要借助匿名内部类TypeReference类实现
        this.args = mapper.readValue(args, new TypeReference<HashMap<String,Object>>() {});
    }

/**
 * 使用枚举类 声明交换机类型
 */
public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);

    private final int type;
    private ExchangeType(int type){
        this.type = type;
    }
    public int getType(){
        return this.type;
    }
}

MSGQueue:


/**
 * 创建队列类
 */
@Data
public class MSGQueue {
    //队列唯一标识
    private String name;
    //是否持久化 ,默认持久化
    private boolean durable = true;
    //是否自动删除 ,默认不自动删除
    private boolean autoDelete = false;
    //选填参数
    private Map<String,Object> args;
}

这里的args有和Excahnge同样的问题,要重写args的set和get方法:

    //这里的args的数据类型是Map,数据库中没有对应的数据类型,
    // 存入数据库时,要转换成String类型,
    //从数据库中读取时,要转换成Map类型
    public String getArgs(){
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.writeValueAsString(args);
        } catch (JsonProcessingException e) {
//            throw new RuntimeException(e);
            e.printStackTrace();
        }
        return "{}";
    }

    public void setArgs(String args) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        this.args = mapper.readValue(args, new TypeReference<HashMap<String,Object>>() {});
    }

Binding:

/**
 * 创建绑定关系 交换机->队列
 */
@Data
public class Binding {
    //要绑定的交换机名
    private String exchangeName;
    //要绑定的队列名
    private String queueName;
    //绑定关系(只有交换机为Topic类型时,该绑定才起作用)
    private String bindingKey;
}

Message:

/**
 * 创建消息
 * 后面传递消息时,需要将消息进行序列化,此处要实现Serializable接口才可以被序列化
 */
@Data
public class Message implements Serializable {
    //消息中的真正数据
    private byte[] body;
    //消息本身的一些属性:
    private BasicProperties basicProperties = new BasicProperties();
    //消息在文件中的起始位置,[offsetBeg,offsetEnd]
    //由于消息要持久化存储,因此要存储在磁盘中,为了区分每一条消息,记录每条消息的起始位置
    //使用transient修改,避免被序列化
    private transient long offsetBeg = 0;
    private transient long offsetEnd = 0;
    //消息在磁盘中是否有效,该属性用于删除磁盘消息使用,使用逻辑删除
    //0x1:有效 0x0:无效
    private byte isVaild = 0x1;

    //创建消息 同时给消息分配一个id,通过UUID,创建唯一ID,消息Id以"M-"开头
    //routingKey:以参数为准,当原来的消息存在routingKey时,参数会将其覆盖
    //此处相当于使用了工厂方法创建消息对象,而没有使用构造方法,因为这样可以通过方法名获取到相关信息
    public Message createMessageById(String routingKey,BasicProperties basicProperties,byte[] body){
        Message message = new Message();
        message.setMessageId("M-"+ UUID.randomUUID());
        message.setRoutingKey(routingKey);
        if(basicProperties!=null){
            message.setBasicProperties(basicProperties);
        }
        message.setBody(body);
        return message;
    }

    //为了方便引用,将BasicProperties中的属性在这里调用一下,方便直接通过Message调用其基本属性
    public String getMessageId(){
        return basicProperties.getMessageId();
    }

    public void setMessageId(String messageId){
        basicProperties.setMessageId(messageId);
    }

    public String getRoutingKey(){
        return basicProperties.getRoutingKey();
    }
    public void setRoutingKey(String routingKey){
        basicProperties.setRoutingKey(routingKey);
    }

    public int getDeliveryMode(){
        return basicProperties.getDeliveryMode();
    }
    public void setDeliveryMode(int deliveryMode){
        basicProperties.setDeliveryMode(deliveryMode);
    }
}

/**
 * 保存消息的属性信息
 * 为保证消息可以被序列化,此处也要实现Serializable接口
 */
@Data
public class BasicProperties implements Serializable {
    //消息的唯一标识
    private String messageId;
    //消息的RoutingKey,用于消息转发
    private String routingKey;
    //是否持久化
    //1:持久化 0:非持久化
    private int deliveryMode = 1;
}

六.数据库设计

要将数据持久化,就要存储到磁盘上,常用的数据库有MySQL,....

此处我们使用SQList来存储数据,SQList较于MySQL更加轻量级,使用起来也更简单.

SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件

引入pom.xml依赖:

<!--        引入 SQLite数据库-->
 <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
 <dependency>
     <groupId>org.xerial</groupId>
     <artifactId>sqlite-jdbc</artifactId>
     <version>3.49.1.0</version>
</dependency>

配置数据源applicaton.yml:

spring:
  datasource:
    url: jdbc:sqlite:./data/meta.db #约定将数据库文件放到 ./data/meta.db中
    username:  #由于sqlite不是针对客户端服务的,因此无需设置用户名和密码
    password:
    driver-class-name: org.sqlite.JDBC
mybatis:
  mapper-locations: classpath:mapper/**Mapper.xml

创建表,删除表,查询数据:


/**
 * 创建表
 */
@Mapper
public interface MetaMapper {
    //创建交换机 队列 绑定关系表
    void createExchangeTable();
    void createQueueTable();
    void createBindingTable();

    //新增数据
    void insertExchange(Exchange exchange);
    void insertQueue(MSGQueue queue);
    void insertBinding(Binding binding);

    //删除数据
    void deleteExchange(String exchangeName);
    void deleteQueue(String queueName);
    void deleteBinding(String exchangeName,String queueName);

    //查询数据
    List<Exchange> selectAllExchanges();
    List<MSGQueue> selectAllQueues();
    List<Binding> selectAllBindings();
}

由于MyBatis仅针对MySQL / Oracle ⽀持执⾏多个 SQL 语句的, 但是针对 SQLite 是不⽀持的, 只能写成多个⽅法.

MetaMapper.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.rabbitmq.mq02.mqServer.mapper.MetaMapper">
<!--    创建表-->
    <update id="createExchangeTable">
        create table if not exists exchange(
            name varchar(64) promary key,
            type int,
            durable boolean,
            autoDelete boolean,
            args varchar(1024)
        )
    </update>

    <update id="createQueueTable">
        create table if not exists queue(
            name varchar(64) primary key,
            durable boolean,
            autoDelete boolean,
            args varchar(1024)
        )
    </update>

    <update id="createBindingTable">
        create table if not exists binding(
            exchangeName varchar(64),
            queueName varchar(64),
            bindingKey varchar(64)
        )
    </update>
<!--    插入数据-->
    <insert id="insertExchange" parameterType="org.rabbitmq.mq02.mqServer.core.Exchange">
        insert into exchange values(#{name},#{type},#{durable},#{autoDelete},#{args});
    </insert>

    <insert id="insertQueue" parameterType="org.rabbitmq.mq02.mqServer.core.MSGQueue">
        insert into  queue values(#{name},#{durable},#{autoDelete},#{args})
    </insert>

    <insert id="insertBinding" parameterType="org.rabbitmq.mq02.mqServer.core.Binding">
        insert into binding values(#{exchangeName},#{queueName},#{bindingKey})
    </insert>
<!--    删除数据-->
    <delete id="deleteExchange" parameterType="java.lang.String">
        delete from exchange where name = #{name};
    </delete>
    <delete id="deleteQueue" parameterType="java.lang.String">
        delete from queue where name = #{name};
    </delete>
    <delete id="deleteBinding" parameterType="java.lang.String">
        delete from binding where bindingKey = #{bindingKey};
    </delete>
<!--    查找数据-->
    <select id="selectAllExchanges" resultType="org.rabbitmq.mq02.mqServer.core.Exchange">
        select * from exchange;
    </select>
    <select id="selectAllQueues" resultType="org.rabbitmq.mq02.mqServer.core.MSGQueue">
        select * from queue;
    </select>
    <select id="selectAllBindings" resultType="org.rabbitmq.mq02.mqServer.core.Binding">
        select * from binding;
    </select>
</mapper>

七.实现DataBaseManager类:

通过这个类来封装针对数据库的操作.整理数据库中的数据

MeatMapper要通过手动管理,而非spring注入,修改MqApplication类,新增一个属性:

@SpringBootApplication
public class Mq02Application {
    public static ConfigurableApplicationContext context;
    public static void main(String[] args) {
        context = SpringApplication.run(Mq02Application.class, args);
    }
}

DataBaseManager类:

/**
 * 通过这个类来封装针对数据库的操作.
 */
public class DataBaseManager {
//通过手动管理,非spring注入
    private MetaMapper meatMapper;

    public void init(){
        //手动管理数据
        meatMapper = Mq02Application.context.getBean(MetaMapper.class);
        //建库建表
        //1.先查询数据库是否已经存在
        if(!checkDBExists()){
            //不存在时,先建库建表
            File file = new File("./data/meta.db");
            //先创建目录
            file.mkdirs();
            //创建表
            createTable();
            //初始化,主要初始化 交换机的类型
            createDefaultExchange();
            System.out.println("[DataBaseManager] 数据库初始化完成!");
        }else{
            System.out.println("[DataBaseManager] 数据库以存在!");
        }
    }

//初始化交换机
    private void createDefaultExchange() {
        Exchange exchange = new Exchange();
        exchange.setType(ExchangeType.DIRECT);
        exchange.setName("");
        exchange.setAutoDelete(false);
        exchange.setDurable(false);
        meatMapper.insertExchange(exchange);
        System.out.println("[DataBaseManager] 初始化交换机成功!");
    }

    private void createTable() {
        meatMapper.createExchangeTable();
        meatMapper.createQueueTable();
        meatMapper.createBindingTable();
        System.out.println("[DataBaseManager] 建表成功!");
    }

    //查询数据库是否已经创建
    private boolean checkDBExists() {
        File file = new File("./data/meta.db");
        return file.exists();
    }

//封装其他数据库操作:对交换机 队列 绑定关系的 增删查
    public void insertExchange(Exchange exchange){
    meatMapper.insertExchange(exchange);
}
    public List<Exchange> selectAllExchanges(){
        return meatMapper.selectAllExchanges();
    }
    public void deleteExchange(String exchangeName){
        meatMapper.deleteExchange(exchangeName);
    }

    public void insertQueue(MSGQueue queue){
        meatMapper.insertQueue(queue);
    }
    public List<MSGQueue> selectAllQueues(){
        return meatMapper.selectAllQueues();
    }
    public void deleteQueue(String queueName){
        meatMapper.deleteQueue(queueName);
    }

    public void insertBinding(Binding binding){
        meatMapper.insertBinding(binding);
    }
    public List<Binding> selectAllBindings(){
        return meatMapper.selectAllBindings();
    }
    public void deleteBindings(String bindingKey){
        meatMapper.deleteBinding(bindingKey);
    }
}

编写DataBaseManager类的测试用例:

在实现项目的过程中,每实现一个阶段的功能,不可能一下就全部正确实现,可能会出现各种错误.

就要及时编写测试用例,对当前代码功能进行测试,防止写完了整个项目再测试,出现各种错误.

做到有错即时查找修改.

    private DataBaseManager dataBaseManager = new DataBaseManager();
    /**
     * 有些测试用例产生的数据会影响之后的测试结果,在每次测试之前,还要进行数据清除,比较麻烦
     * 这个方法,在执行每一个测试前,要执行该方法,用于初始化数据,构造好数据
     */
    @BeforeEach
    void setUp() {
        Mq02Application.context = SpringApplication.run(MetaMapper.class);
        //在执行每个测试之前,进行数据库的创建和舒适化工作:
        dataBaseManager.init();
    }
    /**
     * 在执行每个测试用例之后,都要执行该方法,用于数据的清除和资源的释放,
     */
    @AfterEach
    void tearDown() {
        //释放资源,删除数据库
        Mq02Application.context.close();
        dataBaseManager.deleteTable();
    }

再在DataBaseManager类中添加删除数据库及目录方法:

    //删除数据库及创建的目录:
    public void deleteTable() {
        File file = new File("./data/meta.db");
        //1.删除数据库:
        boolean ok = file.delete();
        if(ok){
            System.out.println("[DataBaseManager] 删除数据库成功!");
        }else{
            System.out.println("[DataBaseManager] 删除数据库失败!");
        }
        //2.删除目录:
//注意,在windows系统上,只有目录为空时,才能删除成功
        File file1 = new File("./data");
        ok = file1.delete();
        if(ok){
            System.out.println("[DataBaseManager] 删除目录成功!");
        }else{
            System.out.println("[DataBaseManager] 删除目录失败!");
        }
    }

测试init()方法:

    @Test
    void init(){
    //初始化方法已经在setUp方法中调用过了,这里只要判断数据是否正确:
        List<Exchange> exchanges = dataBaseManager.selectAllExchanges();
        List<MSGQueue> msgQueues = dataBaseManager.selectAllQueues();
        List<Binding> bindings = dataBaseManager.selectAllBindings();
        //判断创建结果是否正确,可以通过打印结果,自己观察比对,
        //还可以调用专门的测试方法,直接与期望值进行对比,判断是否正确
        //assertEquals方法的第一个参数默认为期望值,第二个参数为实际值
        //判断期望值与实际值是否一致,一致返回true,不一致返回false
        //交换机在初始化的时候已经初始化了一个,因此,查询到的交换机的个数应为1个,队列和绑定为0个
        Assertions.assertEquals(1,exchanges.size());
        Assertions.assertEquals(0,msgQueues.size());
        Assertions.assertEquals(0,bindings.size());
    }

对交换机,队列,绑定的插入删除测试:

//对交换机的测试   
 //先创建一个交换机实例:
    private Exchange createExchange(String exchangeName){
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
//        exchange.setArgs(null);
        return exchange;
    }
    @Test
    void insertExchange() {
        Exchange exchange = createExchange("exchangeTest");
        dataBaseManager.insertExchange(exchange);
        List<Exchange> exchanges = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2,exchanges.size());
        Exchange e = exchanges.get(1);
        Assertions.assertEquals("exchangeTest",e.getName());
        Assertions.assertEquals(ExchangeType.DIRECT,e.getType());
        Assertions.assertTrue(e.isDurable());
        Assertions.assertFalse(e.isAutoDelete());
    }

    @Test
    void deleteExchange() {
        Exchange exchange = createExchange("exchangeTest");
        dataBaseManager.insertExchange(exchange);
        List<Exchange> exchanges = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2,exchanges.size());

        dataBaseManager.deleteExchange("exchangeTest");
        exchanges = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(1,exchanges.size());

    }
//对队列的测试
    private MSGQueue createQueue(String queueName){
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setAutoDelete(false);
        return queue;
    }
    @Test
    void insertQueue() {
        MSGQueue queue = createQueue("queueTest");
        dataBaseManager.insertQueue(queue);
        List<MSGQueue> queues = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(1,queues.size());
        Assertions.assertEquals("queueTest",queues.get(0).getName());
        Assertions.assertTrue(queues.get(0).isDurable());
        Assertions.assertFalse(queues.get(0).isAutoDelete());
    }

    @Test
    void deleteQueue() {
        MSGQueue queue = createQueue("queueTest");
        dataBaseManager.insertQueue(queue);
        List<MSGQueue> queues = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(1,queues.size());

        dataBaseManager.deleteQueue("queueTest");
        queues = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(0,queues.size());
    }

//对绑定的测试:
    private Binding createBinding(String bindingKey){
        Binding binding = new Binding();
        binding.setExchangeName("exchangeTest");
        binding.setQueueName("queueTest");
        binding.setBindingKey(bindingKey);
        return binding;
    }
    @Test
    void insertBinding() {
        //创建绑定之前,要先创建交换机和队列
        Exchange exchange = createExchange("exchangeTest");
        dataBaseManager.insertExchange(exchange);
        List<Exchange> exchanges = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2,exchanges.size());

        MSGQueue queue = createQueue("queueTest");
        dataBaseManager.insertQueue(queue);
        List<MSGQueue> queues = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(1,queues.size());

        Binding binding = createBinding("bindingKeyTest");
        dataBaseManager.insertBinding(binding);
        List<Binding> bindings = dataBaseManager.selectAllBindings();
        Assertions.assertEquals(1,bindings.size());
        Binding b = bindings.get(0);
        Assertions.assertEquals("bindingKeyTest",b.getBindingKey());
        Assertions.assertEquals("exchangeTest",b.getExchangeName());
        Assertions.assertEquals("queueTest",b.getQueueName());
    }

    @Test
    void deleteBindings() {
        //创建绑定之前,要先创建交换机和队列
        Exchange exchange = createExchange("exchangeTest");
        dataBaseManager.insertExchange(exchange);
        List<Exchange> exchanges = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2,exchanges.size());

        MSGQueue queue = createQueue("queueTest");
        dataBaseManager.insertQueue(queue);
        List<MSGQueue> queues = dataBaseManager.selectAllQueues();
        Assertions.assertEquals(1,queues.size());

        Binding binding = createBinding("bindingKeyTest");
        dataBaseManager.insertBinding(binding);
        List<Binding> bindings = dataBaseManager.selectAllBindings();
        Assertions.assertEquals(1,bindings.size());

        dataBaseManager.deleteBindings("bindingKeyTest");
        bindings = dataBaseManager.selectAllBindings();
        Assertions.assertEquals(0,bindings.size());
    }

八.消息存储设置

消息要存储在磁盘中,从文件中读写,消息较于交换机队列来说,查询次数较少,仅需要读和取消息.

因此从文件中读取比从数据库中查找要快很多.

根据消息所在不同的队列,为每个队列命名一个目录,为 data/队列名.eg: ./data/testQueue.

每个队列所目录中包含两个文件: 消息数据文件,消息统计文件. 

数据文件queue_data.txt : 存储消息体相关数据.

规定文件格式:使用二进制存储(节省空间).每个消息分成2部分,第一部分存储该消息对象的长度,规定占4字节; 第二部分存储消息的数据部分.消息和消息之间首尾相连.

统计文件queue_stat.txt: 存储该数据文件中的总消息数和有效消息数,

规定使用文本格式存储, 仅存储一行两列数据,第一列为数据文件中的总消息数,第二行为有效消息数中间用\t进行分隔.eg: 100\t50.

九.创建MessageFileManager类

创建MessageFileManager类:

/**
 * 用于管理消息在文件中的存储
 * 该类中药完成的事情:1.创建数据文件,统计文件的目录,文件
 *               2.读写统计文件
 *               3.读写数据文件
 *               4.读取文件中所有的有效消息
 *               5.对文件中的消息进行垃圾回收
 */
public class MessageFileManager {
    /**
     * 创建一个内部类,表示消息的统计文件类
     */
    public static class Stat {
        //消息总数
        public int totalCount;
        //有效消息数
        public int validCount;
    }
    //这里目前没有要初始化的
    public void init(){}
}

实现该类的功能:

1.创建数据文件:统计文件的目录,文件

//获取队列文件目录名:
    public String getQueueDir(String queueName){
        return "./data/"+queueName;
    }
    //获取消息 数据文件 路径名
    public String getDataFilePath(String queueName){
        return getQueueDir(queueName)+"/queue_data.txt";
    }
    //获取消息 统计文件 路径名
    public String getStatFilePath(String queueName){
        return getQueueDir(queueName)+"/queue_stat.txt";
    }
    //创建队列文件
    public void createQueueFile(String queueName) throws MqException, IOException {
        File file = new File(getQueueDir(queueName));
        //当目录不存在时,创建目录
        if (!file.exists()) {
            boolean ok = file.mkdirs();
            if (!ok) {
                throw new MqException("[MessageFileManager] 创建目录失败!");
            }
        }
        //创建文件
        File dataFile = new File(getDataFilePath(queueName));
        if (!dataFile.exists()) {
            //数据文件不存在时,创建数据文件:
            boolean ok = dataFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 创建数据文件失败!");
            }
        }
        File statFile = new File(getStatFilePath(queueName));
        if (!statFile.exists()) {
            boolean ok = statFile.createNewFile();
            if (!ok) {
                throw new MqException("[MessageFileManager] 创建统计文件失败!");
            }
        }
       //初始化统计文件数据
        Stat stat = new Stat();
        stat.totalCount = 0;
        stat.validCount = 0;
        writeStat(queueName,stat);
        System.out.println("[MessageFileManager] 创建目录及文件成功!");
    }
    //销毁文件和目录
 //删除指定队列的目录和文件
    //当队列被删除后,对应的文件和目录也就要被删除了
    public void destoryQueueFile(String queueName) throws IOException {
        //1.删除数据文件
        File dataFile = new File(getDataFilePath(queueName));
        boolean ok1 = dataFile.delete();
        //2.删除统计文件
        File statFile = new File(getStatFilePath(queueName));
        boolean ok2 = statFile.delete();
        //3.删除总目录
        File baseDir = new File(getQueueDir(queueName));
        boolean ok3 = baseDir.delete();
        if(!ok1 || !ok2 || !ok3){
            throw new IOException("删除队列和文件失败! baseDir: "+baseDir.getAbsolutePath());
        }
    }

自定义一个MqException异常,

用于处理抛出消息队列过程中的逻辑异常.

/**
 * 与项目有关的异常
 */
public class MqException extends Exception {
    public MqException(String reason){
        super(reason);
    }
}

2.实现读写统计文件功能:

//从统计文件中读取消息
    private Stat readStat(String queueName){
        Stat stat = new Stat();
        try (FileInputStream fileInputStream = new FileInputStream(getStatFilePath(queueName))){
            //通过Scanner来读取文件中的数据
            Scanner scan = new Scanner(fileInputStream);
            stat.totalCount = scan.nextInt();
            stat.validCount = scan.nextInt();
            return stat;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
    
    //写消息到统计文件中
    private void writeStat(String queueName,Stat stat){
        //注意:此处打开文件,默认是清空文件的,可以通过设置第二个参数为true:拼接文件形式打开文件,不会清空数据
        try(FileOutputStream outputStream = new FileOutputStream(getStatFilePath(queueName))){
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount+"\t"+stat.validCount);
            //注意:要刷新缓冲区,将写入的数据从缓冲区都放到磁盘中
            printWriter.flush();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

3.实现读写数据文件功能:

将消息存储到文件中是以二进制形式存储的,要先将消息进行序列化,然后再进行写入.读取消息数据的时候,也要将二进制文件进行反序列化,才能看懂文件内容.

这里先实现一个序列化和反序列工具类:


/**
 * 消息序列化和饭系列化工具,将所有方法都用static修饰,直接通过类名调用方法
 * 序列化:将对象转化为字节数组
 * 反序列化:将字节数组转化为对象
 */
public class BinaryTool {
    //序列化: 将对象转为字节数组
    //将Object序列化的数据逐渐写到ByteArrayOutputStream中,再转换为byte[]
    public static byte[] toByte(Object object) throws IOException {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
                //writeObject方法就会将对象进行序列换,转换为二进制数据写入到ObjectOutputStream中,
                //由于ObjectOutputStream又关联了ByteArrayOutputStream,
                //于是最终就写入到了ByteArrayOutputStream中
                objectOutputStream.writeObject(object);
            }
            return byteArrayOutputStream.toByteArray();
        }
    }
    //反序列化: 将字节数组转化为对象
    public static Object fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {
            try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
                object = objectInputStream.readObject();
            }
        }
        return object;
    }
}

4.实现读写消息方法:


    //向数据文件中发送消息
    //1.判断文件是否存在
    //2.将消息序列化
    //3.写入数据文件
    //4.更新统计文件数据
    //考虑线程安全问题:
    //当多个线程同时发送消息时,可能会发送线程安全问题,这里以队列为维度加锁,因此要传入参数队列对象
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
        //1.判断文件是否存在:
        boolean ok = checkFileExists(queue.getName());
        if(!ok){
            throw new MqException("[MessageFileManager] 数据文件不存在,发送消息到数据文件失败 queueName: "+queue.getName());
        }
        synchronized(queue){
            //2.将消息先进行序列化:
            byte[] messageByte = BinaryTool.toByte(message);
            //3.写消息到文件中:
            File file = new File(getDataFilePath(queue.getName()));
            //设置 消息在文件中的初始位置属性:
            message.setOffsetBeg(file.length()+4);
            message.setOffsetEnd(file.length()+4+messageByte.length);
            //这里打开文件要以追加的方式打开,不能清空文件中的内容
            try(FileOutputStream outputStream = new FileOutputStream(file,true);
                DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
                //先写前4个字节:记录消息的长度,
                //注意,这里规定的就是用4字节的长度,不能单使用write写入,这样还是只写入1个字节
                dataOutputStream.writeInt(messageByte.length);
                dataOutputStream.write(messageByte);
            }
            //更新统计文件数据
            Stat stat = readStat(queue.getName());
            stat.totalCount+=1;
            stat.validCount+=1;
            writeStat(queue.getName(),stat);
        }
    }

    private boolean checkFileExists(String queueName) throws MqException {
        File baseFile = new File(getQueueDir(queueName));
        if(!baseFile.exists()){
            System.out.println("[MessageFileManager] 队列对应的目录不存在 queueName:"+queueName);
            return false;
        }
        File dataFile = new File(getDataFilePath(queueName));
        if(!dataFile.exists()){
            System.out.println("[MessageFileManager] 队列对应的数据文件不存在 queueName:"+queueName);
            return false;
        }
        File statFile = new File(getStatFilePath(queueName));
        if(!statFile.exists()){
            System.out.println("[MessageFileManager] 队列对应的统计文件不存在 queueName:"+queueName);
            return false;
        }
        return true;
    }

    //将一条消息从文件中删除(逻辑删除,将isValid置为无效:0x0
    //1.先读取到消息
    //2.将消息的isValid设为无效
    //3.再重新写入到文件原来位置
    //4.修改配置文件数据
    //删除文件也存在线程安全问题,要以队列为维度进行上锁
    public void deleteMessageFromFile(MSGQueue queue,Message message){
        synchronized(queue){
            //要找消息的位置,采用随机读取文件方法,且以可读可写的方式打开文件
            try(RandomAccessFile randomAccessFile = new RandomAccessFile(getDataFilePath(queue.getName()),"rw")){
                int len = (int)(message.getOffsetEnd() - message.getOffsetBeg());
                byte[] data = new byte[len];
                //定位光标到消息的起始位置:
                randomAccessFile.seek(message.getOffsetBeg());
                //从消息的起始位置读取文件
                randomAccessFile.read(data);
                //将数据反序列化:
                 Message newMessage = (Message) BinaryTool.fromBytes(data);
                 //修改为无效消息
                newMessage.setIsVaild((byte)0x0);
                //再将消息写入到文件中:
                //先进行序列化消息
                byte[] payload = BinaryTool.toByte(newMessage);
                //要写到原来位置,在读取的时候,光标会变化,要重新设置光标,
                randomAccessFile.seek(message.getOffsetBeg());
                randomAccessFile.write(payload);

                //更新统计文件数据
                Stat stat = readStat(queue.getName());
                if(stat.validCount>0){
                    stat.validCount-=1;
                }
                writeStat(queue.getName(),stat);
            } catch (IOException | ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        }
    }

5.加载文件中的所有有效消息;

用于服务器重启时恢复数据使用:


    //获取文件中的所有有效数据,该方法用户服务器宕机重启时,恢复数据
    public List<Message> loadAllMessage(String queueName) throws IOException, MqException, ClassNotFoundException {
        LinkedList<Message> messages = new LinkedList<>();
        try(FileInputStream fileInputStream = new FileInputStream(getDataFilePath(queueName));
            DataInputStream dataInputStream = new DataInputStream(fileInputStream)){
            long currentOffset = 0;
            while(true){
                int len = dataInputStream.readInt();
                //读取数据
                byte[] data = new byte[len];
                int n = dataInputStream.read(data);
                if(n!=len){
                    throw new MqException("[MessageFileManager] 读取消息格式有误! queueName:"+queueName);
                }
                //进行反序列化
                Message message = (Message) BinaryTool.fromBytes(data);
                if(message.getIsVaild()==0x0){
                    //为无效消息,直接跳过
                    currentOffset+=(4+len);
                    continue;
                }
                //设置消息的起始位置
                message.setOffsetBeg(currentOffset+4);
                message.setOffsetEnd(currentOffset+4+len);
                currentOffset+=(4+len);
                messages.add(message);
            }
        } catch (EOFException e){
            //这个异常是读取到文件末尾,抛出的异常,属于正常逻辑
            System.out.println("[MessageFileManager] 加载消息完成 !");
        }
        return messages;
    }

6.实现垃圾清除功能:GC:


    //进行垃圾回收:当统计文件中记录的有效数据数量占总数据数量<50%时,进行垃圾回收
    //垃圾回收:新创建一个文件,将有效消息复制到新文件中,复制完后,将源文件删除,将新文件名改为源文件名
    public void GC(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {
        //创建一个新的文件:
        File newFile = new File(getNewDataFilePath(queue.getName()));
        if(newFile.exists()){
            throw new MqException("[MessageFileManager] 上次GC未执行成功, queueName:"+queue.getName());
        }
        boolean ok = newFile.createNewFile();
        if(!ok){
            throw new MqException("[MessageFileManager] 创建新文件失败! queueName:"+queue.getName());
        }
        //将原来文件中的消息都取出来
        List<Message> messages = loadAllMessage(queue.getName());
        //将消息进行序列化,再存入新文件中
        try(FileOutputStream outputStream = new FileOutputStream(newFile,true);
            DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
            for(Message m:messages){
                byte[] payload = BinaryTool.toByte(m);
                dataOutputStream.writeInt(payload.length);
                dataOutputStream.write(payload);
            }
        }
        //删除旧文件
        String oldDataFilePath = getDataFilePath(queue.getName());
        File oldFile = new File(oldDataFilePath);
        ok = oldFile.delete();
        if(!ok){
            throw new MqException("[MessageFileManager] 旧文件删除失败! oldFilePath:"+oldFile.getAbsolutePath());
        }
        //更新新文件名:
        ok = newFile.renameTo(oldFile);
        if(!ok){
            throw new MqException("[MessageFileManager] 更新文件名失败 newFilePath"+newFile.getAbsolutePath());
        }
        //更新统计文件数据
        Stat stat = readStat(queue.getName());
        stat.totalCount = messages.size();
        stat.validCount = messages.size();
        writeStat(queue.getName(),stat);
    }
//创建一个新的文件路径:
    private String getNewDataFilePath(String queueName) {
        return getQueueDir(queueName)+"newQueue_data.txt";
    }
//查看当前队列是否需要进行GC扫描
    public boolean checkGC(String queueName){
        Stat stat = readStat(queueName);
        int t1 = stat.validCount;
        int t2 = stat.totalCount;
        if(t2>=2000 && t1*1.0 /t2*1.0<0.5){
            return true;
        }
        return false;
    }

编写MessageFileManager测试用例:

@SpringBootTest
class MessageFileManagerTest {
    private MessageFileManager messageFileManager = new MessageFileManager();
    private static String queueName1 = "queueTest1";
    private static String queueName2 = "queueTest2";
  //创建2个文件
    @BeforeEach
    void setUp() throws IOException, MqException {
        messageFileManager.createQueueFile(queueName1);
        messageFileManager.createQueueFile(queueName2);
        System.out.println("2个文件创建成功!");
    }
//销毁文件
    @AfterEach
    void tearDown() throws IOException {
        messageFileManager.destoryQueueFile(queueName1);
        messageFileManager.destoryQueueFile(queueName2);
        System.out.println("2个文件销毁成功!");
    }

    @Test
    void createQueueFile() {
        //在setUp方法中已经调用了该方法了,这里仅进行结果测试对比了
        File dataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
        File dataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");

        File statFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        File statFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");

        Assertions.assertTrue(dataFile1.exists());
        Assertions.assertTrue(dataFile2.exists());

        Assertions.assertTrue(statFile1.exists());
        Assertions.assertTrue(statFile2.exists());
    }

    @Test
    void readWriteStat() {
        MessageFileManager.Stat stat = new MessageFileManager.Stat();
        stat.totalCount = 100;
        stat.validCount = 50;

        //由于对统计文件的读写访问方法都是私有方法,无法直接进行调用
        //但又必须进行测试,可以通过反射的方法进行访问
        //注意:这里不能通过先修改督学stat方法的访问权限,进行测试,然后再将访问权限改过来,
        //因为对原代码测试后再修改后的测试就是无效的,不知道修改后的代码是否还会出现别的问题
        //                 参数说明:      要反射的方法的类对象    要反射的方法名      方法的参数
        ReflectionTestUtils.invokeMethod(messageFileManager,"writeStat",queueName1,stat);
        MessageFileManager.Stat s = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);

        Assertions.assertEquals(100,s.totalCount);
        Assertions.assertEquals(50,s.validCount);
    }

    private MSGQueue createQueue(){
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName1);
        queue.setDurable(true);
        queue.setAutoDelete(false);
        return queue;
    }

    private Message createMessage(String body){
        Message message = new Message();
        return message.createMessageById("routingKeyTest",null,body.getBytes());
    }

    @Test
    void sendMessage() throws IOException, MqException, ClassNotFoundException {
        MSGQueue queue = createQueue();
        Message message = createMessage("hello");
        messageFileManager.sendMessage(queue,message);

        //测试stat文件
        MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(1,stat.validCount);
        Assertions.assertEquals(1,stat.totalCount);

        //测试data文件
        List<Message> messages = messageFileManager.loadAllMessage(queueName1);
        Assertions.assertEquals(1,messages.size());

        Message m = messages.get(0);
        Assertions.assertEquals(message.getMessageId(),m.getMessageId());
        Assertions.assertEquals(message.getRoutingKey(),m.getRoutingKey());
        Assertions.assertEquals(message.getDeliveryMode(),m.getDeliveryMode());
        Assertions.assertArrayEquals(message.getBody(),m.getBody());
        System.out.println(m);
    }

    @Test
    void deleteMessageFromFile() throws IOException, MqException, ClassNotFoundException {
        MSGQueue queue = createQueue();
        List<Message> list = new LinkedList<>();
        //向队列中发送10条消息
        for(int i=0;i<5;i++){
            Message message = createMessage("hello" + i);
            messageFileManager.sendMessage(queue,message);
            list.add(message);
        }
        Assertions.assertEquals(5,list.size());
        //删除后3条
        messageFileManager.deleteMessageFromFile(queue,list.get(3));
        messageFileManager.deleteMessageFromFile(queue,list.get(4));
        messageFileManager.deleteMessageFromFile(queue,list.get(2));

        List<Message> messages = messageFileManager.loadAllMessage(queueName1);
        Assertions.assertEquals(2,messages.size());

        for(int i=0;i<2;i++){
            Message m = messages.get(i);
            Message m2 = list.get(i);
            Assertions.assertEquals(m2.getRoutingKey(),m.getRoutingKey());
            Assertions.assertEquals(m2.getDeliveryMode(),m.getDeliveryMode());
            Assertions.assertEquals(m2.getMessageId(),m.getMessageId());
            Assertions.assertArrayEquals(m2.getBody(),m.getBody());
            Assertions.assertEquals(m2.getIsVaild(),m.getIsVaild());
        }
    }

    @Test
    void loadAllMessage() throws IOException, MqException, ClassNotFoundException {
        MSGQueue queue = createQueue();
        List<Message> list = new LinkedList<>();
        //向队列中添加100条消息
        for(int i=0;i<100;i++){
            Message message = createMessage("hello"+i);
            messageFileManager.sendMessage(queue,message);
            list.add(message);
        }
        List<Message> messages = messageFileManager.loadAllMessage(queue.getName());
        Assertions.assertEquals(100,messages.size());

        for(int i=0;i<100;i++){
            Message m1 = messages.get(i);
            Message m2 = list.get(i);
            Assertions.assertEquals(m2.getRoutingKey(),m1.getRoutingKey());
            Assertions.assertEquals(m2.getDeliveryMode(),m1.getDeliveryMode());
            Assertions.assertEquals(m2.getMessageId(),m1.getMessageId());
            Assertions.assertArrayEquals(m2.getBody(),m1.getBody());
            Assertions.assertEquals(m2.getIsVaild(),m1.getIsVaild());
        }
    }

    @Test
    void GC() throws IOException, MqException, ClassNotFoundException {
        MSGQueue queue = createQueue();
        List<Message> list = new LinkedList<>();
        //向队列中发送10条消息
        for(int i=0;i<100;i++){
            Message message = createMessage("hello" + i);
            messageFileManager.sendMessage(queue,message);
            list.add(message);
        }
        File file = new File("./data/" + queue.getName() + "/queue_data.txt");
        System.out.println("GC前文件大小: "+file.length());

        Assertions.assertEquals(100,list.size());
        //删除偶数消息
        for(int i=0;i<100;i+=2){
            Message message = list.get(i);
            messageFileManager.deleteMessageFromFile(queue,message);
        }

        messageFileManager.GC(queue);
        System.out.println("GC后文件大小: "+file.length());

        List<Message> messages = messageFileManager.loadAllMessage(queueName1);
        for(int i=0;i<messages.size();i++){
            Message m1 = messages.get(i);
            Message m2 = list.get(2*i+1);
            Assertions.assertEquals(m2.getRoutingKey(),m1.getRoutingKey());
            Assertions.assertEquals(m2.getDeliveryMode(),m1.getDeliveryMode());
            Assertions.assertEquals(m2.getMessageId(),m1.getMessageId());
            Assertions.assertArrayEquals(m2.getBody(),m1.getBody());
            Assertions.assertEquals(m2.getIsVaild(),m1.getIsVaild());
        }
    }
}

十.整合数据库和文件数据

上面的代码中,使用数据库存储了Exchange,Queue,Binding,使用文件存储了Message,

下面对数据库和文件中的数据进行整合.进行统一管理.

见下一篇博客.


网站公告

今日签到

点亮在社区的每一天
去签到