【消息队列】数据库的数据管理

发布于:2025-03-06 ⋅ 阅读:(23) ⋅ 点赞:(0)

1. 数据库的选择

对于当前实现消息队列这样的一个中间件来说,具体要使用哪个数据库,是需要稍作考虑的,如果直接使用 MySQL 数据库也是能实现正常的功能,但是 MySQL 也是一个客户端服务器程序,也就意味着如果想在其他服务器上部署这个消息队列的项目,还得需要安装 MySQL,其实是不够轻量化的!!!

此处为了使用更方便,能将这里实现的消息队列单独使用,简化配置环境,于是采用的数据库是更轻量级的数据库,SQLite。

SQLite 应用非常的广泛,尤其是在一些性能不高的设备上使用数据库的首选,一个完整的 SQLite 数据库,只有一个单独的可执行文件,体量特别小,不到 1M,我们甚至只需要在 maven 中引入相关依赖就可以使用 MyBatis 操作数据库了。

对比 MySQL 来说,SQLite 只是一个本地的数据库,并不是一个客户端服务器结构的程序,而是相当于直接操作本地的硬盘文件。

在 pom.xml 中引入 SQLite:

<dependency>
    <groupId>org.xerial</groupId>
    <artifactId>sqlite-jdbc</artifactId>
    <version>3.42.0.0</version>
</dependency>

application.yml 中配置 SQLite 和 MyBatis 匹配路径:

spring:
  datasource:
    url: jdbc:sqlite:./data/meta.db
    username:
    password:
    driver-class-name: org.sqlite.JDBC
    
mybatis:
  mapper-locations: classpath:mapper/**Mapper.xml

mybatis 配置项的配置就不用说了,这里主要是了解 datasource 配置项里面的 url,这里的 url 就是 SQLite 把数据存储在当前硬盘的某个指定的文件中。

此处使用的是相对路径,如果是在 IDEA 中直接运行程序,此时的工作路径就是当前项目所在的路径,如果是通过 java -jar 方式运行程序,此时在哪个目录下执行的命令,哪个目录就是工作目录。

而且此处的 username 和 password 是不需要声明的,MySQL 是一个客户端服务器程序,就可能会有很多个客户端去访问它,而 SQLite 不是客户端服务器程序,只有本地主机才能访问了(数据库存储在本地)。

虽然 MySQL 和 SQLite 不太一样,但是它们同样可以使用 MyBatis 这样的框架来操作。

完整的 xml 依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>3.0.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter-test</artifactId>
            <version>3.0.4</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>3.42.0.0</version>
        </dependency>
    </dependencies>

2. 要存储到数据库中的数据

其实对于要存储到数据库中的数据,在思维导图中已经写出来了,那为什么这些数据要放在数据库中存储?换句话来说,这些数据使用文件来存储不行吗?

对于需要存储到数据库中的数据有:

交换机,队列,绑定

为什么交换机与队列绑定不能使用文件存储呢?其实也行,只是在考虑效率问题方面和业务需求上的考虑,最终使用数据库存储,这里想象一下,前面提到的 BrokerServer 需要提供的 API 中,创建交换机,创建队列,创建绑定(根据交换机和队列是否持久化来判断绑定是否需要持久化),交换机和队列是可以选择是否持久化的,如果选择了持久化,才说明需要持久化,对于持久化的队列和交换机来说,也不需要反复的增删改查,因为在内存中,也会有一份这样的数据,此时既然内存中有,那为什么要走数据库查询呢?其实本质上队列,交换机,绑定的持久化,只需要在项目重新启动的时候,把数据库中持久化的数据恢复到内存中就可以了。所以只有当 BrokerServer 启动时会恢复数据库的数据到内存中(查询数据库),再者只会在新增队列,新增交换机,新增绑定(插入记录)时可能会操作数据库,其他时候,都是操作在操作内存中的数据的。

所以这样一来,选择数据库存储是完全够用的,但是存储消息为何要使用文件,不推荐使用数据库呢?这里在后面讲到消息存储时会详细讲解。

3. 设计实体类

这里需要先在 SpringBoot 启动类目录下创建一个 mqserver 目录,这个目录用来放 BrokerServer 需要用到的代码,接下来在 mqserver 目录下,在创建一个 core 目录,这里设计的实体类,也就是放在 core 目录下。
在这里插入图片描述

3.1 Exchange 实体类

对于交换机主要由这属性组成:

身份唯一标识:String name

交换机的类型:ExchangeType type

是否持久化:boolean durable

是否自动删除:boolean autoDelete

额外参数选项:Map<String , Object> arguments

对于上述的自动删除,和额外参数选项,此项目就不再进行处理,只是留有一个口子方便随时扩展。

对于这个交换机类型,此处是单独提拎出一个枚举类来表示:

package com.example.messagequeue.mqserver.core;

public enum ExchangeType {
    DIRECT(0), // 直接交换机
    FANOUT(1), // 扇出交换机
    TOPIC(2);  // 主题交换机

    private final int type;

    private ExchangeType(int type) {
        this.type = type;
    }

    public int getType() {
        return type;
    }
}

对于 arguments 虽然不实现具体的功能,但是还是为了避免后续扩展时能顺利的保存到数据库中,此时就需要考虑,数据库中如何存储一个 Map

数据库本身是没有 Map 这样的类型供我们使用的,但是可以把 Map 转换成 json 字符串,在查询的时候,在把这个 json 字符串转换回 Map 就可以了。此处可以使用 ObjectMapper 这样的一个对象进行对 Java 的 json 字符串的序列化和反序列化。

既然这样的思路是可行的,问题来到如何让 MyBatis 框架帮我们存的时候把对象转序列化成 json,数据库中存 json 字符串,取的时候把 json 字符串反序列化成 Java 对象呢?

其实在 MyBatis 完成数据库操作的时候,会自动调用到对象的 getter 和 setter 方法。

当 MyBatis 往数据库中写数据时,就会调用对象的 getter 方法拿到属性的值再往数据库中写,当 MyBatis 从数据库中读数据的时候,就会调用对象的 setter 方法,把数据库中读到的结果设置到对象的属性中。

了解了 MyBatis 会这样操作后,我们只需要针对 arguments 参数的 getter,setter 方法做修改即可。

让 getter 方法返回一个 json 字符串,让 setter 方法形参接收一个 json 字符串就可以了。于是 arguments 的 getter 和 setter 就可以写成这样:

public String getArguments() {
    // 把当前的 arguments 转成 json
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        return objectMapper.writeValueAsString(arguments);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    return "{}";
}

public void setArguments(String argumentsJson) {
    // 数据库读到的 json 转换成对象
    ObjectMapper objectMapper = new ObjectMapper();
    try {
        this.arguments = objectMapper.readValue(argumentsJson, 
                  new TypeReference<HashMap<String, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
    }
}

// 重载一下 arguments 的 getter 和 setter 方便后续使用
public Object getArguments(String key) {
    return arguments.get(key);
}
public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
}
public void setArguments(String key, Object value) {
    this.arguments.put(key, value);
}

Exchange 完整代码:

package com.example.messagequeue.mqserver.core;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;

/**
 * 这个类表示交换机
 */
public class Exchange {
    // 身份标识(唯一)
    private String name;

    // 交换机类型, DIRECT, FANOUT, TOPIC
    private ExchangeType type = ExchangeType.DIRECT;

    //交换机是否要持久化存储
    private boolean durable = false;

    // 没人使用时是否自动删除
    private boolean autoDelete = false;

    // 创建交换机时指定的一些额外的参数选项
    private Map<String , Object> arguments = new HashMap<>();

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ExchangeType getType() {
        return type;
    }

    public void setType(ExchangeType type) {
        this.type = type;
    }

    public boolean isDurable() {
        return durable;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public boolean isAutoDelete() {
        return autoDelete;
    }

    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }

    public Object getArguments(String key) {
        return arguments.get(key);
    }

    public String getArguments() {
        // 把当前的 arguments 转成 json
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
           e.printStackTrace();
        }
        return "{}";
    }

    public void setArguments(String key, Object value) {
        this.arguments.put(key, value);
    }

    public void setArguments(String argumentsJson) {
        // 数据库读到的 json 转换成对象
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            this.arguments = objectMapper.readValue(argumentsJson,
                    new TypeReference<HashMap<String, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

ExchangeType 类完整代码:

package com.example.messagequeue.mqserver.core;

public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);

    private final int type;

    private ExchangeType(int type) {
        this.type = type;
    }

    public int getType() {
        return type;
    }
}

3.2 MsgQueue 实体类

对于队列目前主要由这属性组成:

队列唯一标识:String name

是否持久化:boolean durable

是否只能被一个消费者使用:boolean exclusive

自动删除:boolean autoDelete

扩展参数:Map<String, Object> arguments

这里自动删除和扩展参数,也是本项目中留有扩展接口暂不实现,而 exclusive 参数,是否独有,则留到彩蛋部分。

MsgQueue 这里也没什么好说的,主要也是 arguments 这个参数的 getter 和 setter 需要注意一下。

MsgQueue 完整代码:

package com.example.messagequeue.mqserver.core;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;

/**
 * 这个类表示存储消息的队列
 */
public class MsgQueue {
    // 队列的身份标识
    private String name;

    // 队列是否持久化
    private boolean durable = false;

    // 如果为 true 表示这个队列只能被一个消费者使用 n
    private boolean exclusive = false;

    // 自动删除 n
    private boolean autoDelete = false;

    // 扩展参数 n
    private Map<String, Object> arguments = new HashMap<>();

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isDurable() {
        return durable;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public boolean isExclusive() {
        return exclusive;
    }

    public void setExclusive(boolean exclusive) {
        this.exclusive = exclusive;
    }

    public boolean isAutoDelete() {
        return autoDelete;
    }

    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }

    public Object getArguments(String key) {
        return arguments.get(key);
    }

    public String getArguments() {
        // 把当前的 arguments 转成 json
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "{}";
    }

    public void setArguments(String key, Object value) {
        this.arguments.put(key, value);
    }

    public void setArguments(String argumentsJson) {
        // 数据库读到的 json 转换成对象
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            this.arguments = objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

当然上述代码其实还并不是最终代码,随着项目往后写代码,根据需求的到来也要需要进行一定的扩展。

3.3 Binding 实体类

对于绑定目前主要由这属性组成:

绑定的交换机名:String exchangeName

绑定的队列名:String queueName

绑定的匹配Key:String bindingKey

绑定实体类比较简单,bindingKey 的作用在前面章节也提到过,这里就不多介绍了,Binding 没有主键的原因是要依赖于 exchangeName 和 queueName 这两个维度来进行筛选,实体类主要是映射数据库中的数据,所以其实并不复杂,不涉及到业务,所以也就不做赘述。

Binding 完整代码:

package com.example.messagequeue.mqserver.core;

/**
 * 表示队列和交换机之间的关联关系
 */
public class Binding {

    private String exchangeName;

    private String queueName;

    // 题目
    private String bindingKey;

    public String getExchangeName() {
        return exchangeName;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public String getBindingKey() {
        return bindingKey;
    }

    public void setBindingKey(String bindingKey) {
        this.bindingKey = bindingKey;
    }
}

4. 建表操作

实体类写好了,剩下的就是创建数据库了,对于之前的 MySQL 来说,创建一个表需要先创建一个库,create databases …,然后在 create table …,然后把写好的 SQL 放在一个 db.sql 中,然后把这个 .sql 文件或者把这个文件的内容放在 MySQL 复制粘贴一执行就行了。之前这样做确实没问题,因为这样的项目大概部署一次就够了,不会反复操作,但是这里实现的消息队列,可能会设计到多次部署,比如多个服务器都想部署。

这里有没有一种方法,通过代码来自动的完成建库建表的操作呢?

其实 MyBatis 就能做到,只是之前 xml 来实现数据库的增删改查,对应的就是不同的 xml 标签,对于 create table 这样的语句有对应的标签提供吗?

没有!!!但是可以使用 update 标签来代替,update 标签中也可以写 create 语句,把每个建表的语句,都使用一个 update 标签,并对应一个 Java 方法。能否在一个 update 标签中一次性创建多张表呢?是不行的,当一个 update 标签写了多个 create table 的时候,只有第一个语句能执行。所以这里只能采取一个 Java 方法对应一个 xml 的建表的标签。

看到这,可能有个疑问,库呢?只提到建表,难道不用建立 databases 吗?前面在 yml 中配置的:

spring:
  datasource:
    url: jdbc:sqlite:./data/meta.db
    username:
    password:
    driver-class-name: org.sqlite.JDBC

meta.db 这个文件,本质上就是此项目用到的库,咱们在代码中只需要写创建表的语句就可以了。

现在,就按照上述说的来做:
在这里插入图片描述
在 mqserver 目录下建立 mapper 目录,这里面放着一个接口为 MetaMapper.java,对应的 resources 目录下的 mapper 目录里面的 MetaMapper.xml 就是对应上述 MetaMapper.java 接口里面方法的实现。这个很基本的 MyBatis 操作了,也就不再赘述。

基本标签:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.messagequeue.mqserver.mapper.MetaMapper">


</mapper>

先是建表操作,对于这个数据库来说,应该有三张表,分别是 exchange,queue,binding。

void createExchangeTable();
void createQueueTable();
void createBindingTable();
<update id="createExchangeTable">
    create table if not exists exchange (
    name varchar(50) primary key,
    type int,
    durable boolean,
    autoDelete boolean,
    arguments varchar(1024)
    )
</update>

<update id="createQueueTable">
    create table if not exists queue (
        name varchar(50) primary key,
        durable boolean,
        exclusive boolean,
        autoDelete boolean,
        arguments varchar(1024)
    )
</update>

<update id="createBindingTable">
    create table if not exists binding (
        exchangeName varchar(50),
        queueName varchar(50),
        bindingKey varchar(256)
    )
</update>

5. 数据操作

接下来是增删改查,但是此项目不提供修改,其实也不太会去修改。

接下来就应该实现如下的 SQL 操作了:

插入一个交换机,查找所有的交换机,删除一个交换机

插入一个队列,查找所有的队列,删除一个队列

插入一个绑定,查找所有的绑定,删除一个绑定

为什么不设计一个方法,根据交换机名,或者队列名,查找交换机和队列呢?设置设计一个方法根据交换机名+队列名查找绑定呢?

前文提到过,由于这里对于 Exchange,Queue,Binding 的持久化,就是为了在项目启动的时候,将这些数据库硬盘上的数据恢复到内存中,项目运行起来了,启动了后,那个时候的查找其实就是去内存中查找了,正如思维导图所述,对于 Exchange,Queue,Binding,在内存中也会持有一份,而硬盘中是否持有,取决于客户端的选择了。

List<Exchange> selectAllExchanges();

void deleteExchange(String exchangeName);

void insertQueue(MsgQueue msgQueue);

List<MsgQueue> selectAllQueues();

void deleteQueue(String queueName);

void insertBinding(Binding binding);

List<Binding> selectAllBindings();

void deleteBinding(Binding binding);
<insert id="insertExchange" parameterType="com.example.messagequeue.mqserver.core.Exchange">
    insert into exchange values (
        #{name}, #{type}, #{durable}, #{autoDelete}, #{arguments}
    )
</insert>

<select id="selectAllExchanges" resultType="com.example.messagequeue.mqserver.core.Exchange">
        select * from exchange
</select>

<insert id="insertQueue" parameterType="com.example.messagequeue.mqserver.core.MsgQueue">
    insert into queue values(
    	#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments}
    )
</insert>

<select id="selectAllQueues" resultType="com.example.messagequeue.mqserver.core.MsgQueue">
        select * from queue
</select>

<insert id="insertBinding" parameterType="com.example.messagequeue.mqserver.core.Binding">
        insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey})
</insert>

<select id="selectAllBindings" resultType="com.example.messagequeue.mqserver.core.Binding">
        select * from binding
</select>

<delete id="deleteExchange" parameterType="java.lang.String">
        delete from exchange where name = #{exchangeName}
</delete>

<delete id="deleteQueue" parameterType="java.lang.String">
        delete from queue where name = #{queueName}
</delete>

<delete id="deleteBinding" parameterType="com.example.messagequeue.mqserver.core.Binding">
        delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName}
</delete>

完整代码如下:

MetaMapper.java 完整代码:

package com.example.messagequeue.mqserver.mapper;

import com.example.messagequeue.mqserver.core.Binding;
import com.example.messagequeue.mqserver.core.Exchange;
import com.example.messagequeue.mqserver.core.MsgQueue;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

/**
 * 源属性
 */
@Mapper
public interface MetaMapper {

    // 建表方法
    void createExchangeTable();

    void createQueueTable();

    void createBindingTable();

    // 插入删除查找操作
    void insertExchange(Exchange exchange);

    List<Exchange> selectAllExchanges();

    void deleteExchange(String exchangeName);

    void insertQueue(MsgQueue msgQueue);

    List<MsgQueue> selectAllQueues();

    void deleteQueue(String queueName);

    void insertBinding(Binding binding);

    List<Binding> selectAllBindings();

    void deleteBinding(Binding binding);
}

MetaMapper.xml 完整代码:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.messagequeue.mqserver.mapper.MetaMapper">
    <update id="createExchangeTable">
        create table if not exists exchange (
            name varchar(50) primary key,
            type int,
            durable boolean,
            autoDelete boolean,
            arguments varchar(1024)
        )
    </update>

    <update id="createQueueTable">
        create table if not exists queue (
            name varchar(50) primary key,
            durable boolean,
            exclusive boolean,
            autoDelete boolean,
            arguments varchar(1024)
        )
    </update>

    <update id="createBindingTable">
        create table if not exists binding (
            exchangeName varchar(50),
            queueName varchar(50),
            bindingKey varchar(256)
        )
    </update>

    <insert id="insertExchange" parameterType="com.example.messagequeue.mqserver.core.Exchange">
        insert into exchange values (
        	#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments}
        )
    </insert>

    <select id="selectAllExchanges" resultType="com.example.messagequeue.mqserver.core.Exchange">
        select * from exchange
    </select>

    <insert id="insertQueue" parameterType="com.example.messagequeue.mqserver.core.MsgQueue">
        insert into queue values(
        	#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments}
        )
    </insert>

    <select id="selectAllQueues" resultType="com.example.messagequeue.mqserver.core.MsgQueue">
        select * from queue
    </select>

    <insert id="insertBinding" parameterType="com.example.messagequeue.mqserver.core.Binding">
        insert into binding values(
        	#{exchangeName}, #{queueName}, #{bindingKey}
        )
    </insert>

    <select id="selectAllBindings" resultType="com.example.messagequeue.mqserver.core.Binding">
        select * from binding
    </select>

    <delete id="deleteExchange" parameterType="java.lang.String">
        delete from exchange where name = #{exchangeName}
    </delete>

    <delete id="deleteQueue" parameterType="java.lang.String">
        delete from queue where name = #{queueName}
    </delete>

    <delete id="deleteBinding" parameterType="com.example.messagequeue.mqserver.core.Binding">
        delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName}
    </delete>

</mapper>

6. 整合数据库操作

这一小节的操作,就是将上述的建表以及数据的操作整合到一个类中(DataBaseManager),后续直接使用这个类去操作数据库。

对于第一个,就是先需要初始化,也就是先建立三张需要的表,和一些基本数据,初始化数据库就在 DataBaseManager 中写一个 init() 方法,用于初始化数据库。

要想操作数据库,也就是调用前面创建的 MetaMapper 里面的方法,这里由于是 SpringBoot 的项目,这里需要手动拿到 metaMapper,可以使用注解等等,但这里采取使用 Spring 应用上下文 ApplicationContext 当中获取 metaMapper 对象就可以了。只需要将启动类修改成如下:

package com.example.messagequeue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class MessageQueueApplication {

    public static ConfigurableApplicationContext context;

    public static void main(String[] args) {
        context = SpringApplication.run(MessageQueueApplication.class, args);
    }

}

后续通过 MessageQueueApplication.context 也是可以获取到想要的 Bean 实例。

6.1 初始化数据库

public class DataBaseManager {
    // 手动拿到 metaMapper
    private MetaMapper metaMapper;

    // 数据库初始化
    public void init() {
        // 获取到 MetaMapper
        metaMapper = MyMessageQueueApplication.context.getBean(MetaMapper.class);

        if (!checkDBExists()) {
            File dataDir = new File("./data");
            dataDir.mkdirs();
            createTable();
            createDefaultData();
            System.out.println("[DataBaseManager] 初始化完成!");
        } else {
            System.out.println("[DataBaseManager] 数据库已经存在!");
        }
    }
    
    private boolean checkDBExists() {
        File file = new File("./data/meta.db");
        return file.exists();
    }

    private void createTable() {
        // 不需要手动创建 meta.db
        // 首次执行这里的数据库操作时, 就会自动创建出 meta.db 文件 (mybatis 完成的)
        metaMapper.createExchangeTable();
        metaMapper.createQueueTable();
        metaMapper.createBindingTable();
        System.out.println("[DataBaseManager] 创建表完成!");
    }

    private void createDefaultData() {
        // 添加一个默认的交换机
        // RabbitMQ 设定, 有一个匿名的交换机, 类型是 DIRECT.
        Exchange exchange = new Exchange();
        exchange.setName("");
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        metaMapper.insertExchange(exchange);
        System.out.println("[DataBaseManager] 创建初始数据完成!");
    }
}

6.2 封装操作数据的方法

// 提供方便单元测试时收尾工作要删除数据库的方法
public void deleteDB() {
    File file = new File("./data/meta.db");
    boolean ret = file.delete();
    if (ret) {
        System.out.println("[DataBaseManager] 删除数据库文件成功!");
    } else {
        System.out.println("[DataBaseManager] 删除数据库文件失败!");
    }
    File dataDir = new File("./data");
    ret = dataDir.delete();
    if (ret) {
        System.out.println("[DataBaseManager] 删除数据库目录成功!");
    } else {
        System.out.println("[DataBaseManager] 删除数据库目录失败!");
    }
}

public void insertExchange(Exchange exchange) {
    metaMapper.insertExchange(exchange);
}

public List<Exchange> selectAllExchanges() {
    return metaMapper.selectAllExchanges();
}

public void deleteExchange(String exchangeName) {
    metaMapper.deleteExchange(exchangeName);
}

public void insertQueue(MsgQueue msgQueue) {
    metaMapper.insertQueue(msgQueue);
}

public List<MsgQueue> selectAllQueues() {
    return metaMapper.selectAllQueues();
}

public void deleteQueue(String queueName) {
    metaMapper.deleteQueue(queueName);
}

public void insertBinding(Binding binding) {
    metaMapper.insertBinding(binding);
}

public List<Binding> selectAllBindings() {
    return metaMapper.selectAllBindings();
}

public void deleteBinding(Binding binding) {
    metaMapper.deleteBinding(binding);
}

上述封装操作数据库的方法很简单,本质就是调用了下 metaMapper 里面的方法。

6.3 单元测试

这里详细的单元测试就不写了,相信写过 Spring 项目的都会进行单元测试,那么此处只给出一个标准测试 DataBaseManager 类的架子就行了,按照其中的一个测试方法接着往下写新的测试用例就OK了。

package com.example.messagequeue;


import com.example.messagequeue.mqserver.core.Binding;
import com.example.messagequeue.mqserver.core.Exchange;
import com.example.messagequeue.mqserver.core.ExchangeType;
import com.example.messagequeue.mqserver.core.MsgQueue;
import com.example.messagequeue.mqserver.datacenter.DataBaseManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;

@SpringBootTest
public class DataBaseManagerTests {
    private DataBaseManager dataBaseManager = new DataBaseManager();

    /**
     * 准备工作
     */
    @BeforeEach
    private void setUp() {
        MessageQueueApplication.context = 
            SpringApplication.run(MessageQueueApplication.class);
        dataBaseManager.init();
    }

    /**
     * 收尾工作
     */
    @AfterEach
    private void tearDown() {
        // 删除数据库
        // 为什么要先关闭 context 对象呢?
        // 此处的 context 对象持有了 MetaMapper 的示例, MataMapper 实例又打开了 meta.db 数据库文件
        // 另一方面, 获取 context 操作, 会占用 8080 端口
        MessageQueueApplication.context.close();
        dataBaseManager.deleteDB();
    }

    @Test
    public void testInitTable() {
        // 由于 init 方法在上面 setUp中调用过了, 在下面代码直接检查数据库状态即可
        // 查交换机表, 里面应该有一个数据
        List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
        List<MsgQueue> msgQueueList = dataBaseManager.selectAllQueues();
        List<Binding> bindingList = dataBaseManager.selectAllBindings();

        Assertions.assertEquals(1, exchangeList.size());
        Assertions.assertEquals("", exchangeList.get(0).getName());
        Assertions.assertEquals(ExchangeType.DIRECT, exchangeList.get(0).getType());
        Assertions.assertEquals(0, msgQueueList.size());
        Assertions.assertEquals(0, bindingList.size());
    }

    private Exchange createTestExchange(String exchangeName) {
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.FANOUT);
        exchange.setAutoDelete(false);
        exchange.setDurable(true);
        exchange.setArguments("aaa", 1);
        exchange.setArguments("bbb", 2);
        return exchange;
    }

    @Test
    public void testInsertExchange() {
        Exchange exchange = createTestExchange("testExchange");
        dataBaseManager.insertExchange(exchange);
        List<Exchange> exchangeList = dataBaseManager.selectAllExchanges();
        Assertions.assertEquals(2, exchangeList.size());
        Exchange newExchange = exchangeList.get(1);
        Assertions.assertEquals("testExchange", newExchange.getName());
        Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());
        Assertions.assertEquals(false, newExchange.isAutoDelete());
        Assertions.assertEquals(true, newExchange.isDurable());
        Assertions.assertEquals(1, newExchange.getArguments("aaa"));
        Assertions.assertEquals(2, newExchange.getArguments("bbb"));
    }
    
    // ......