手搓消息队列【RabbitMQ版】

发布于:2023-09-16 ⋅ 阅读:(83) ⋅ 点赞:(0)

什么是消息队列?

阻塞队列(Blocking Queue)-> 生产者消费者模型 (是在一个进程内)
所谓的消息队列,就是把阻塞队列这样的数据结构,单独提取成了一个程序,进行独立部署~ --------> 生产者消费模型 (进程和进程之间/服务和服务之间)
生产者消费者模型作用:

  • 解耦合
    • 本来有个分布式系统,A服务器 调用 B服务器(A给B发请求,B给A返回响应)===》 A 和 B 的耦合是比较大的!
    • 引入消息队列后,A把请求发送到消息队列,B再从消息队列获取到请求
  • 削峰填谷
    • 比如A是入口服务器,A 调用 B 完成一些具体业务,如果是 A 和 B 直接通信,如果突然A 收到一组用户的请求的峰值,此时 B 也会随着受到峰值~
    • 引入消息队列后,A把请求发送到消息队列,B再从消息队列获取到请求。 (虽然A收到很多请求,队列也收到了很多请求,但是B仍旧可以按照原来的节奏处理请求。不至于说一下就收到太多的并发量。)
    • 举个例子:高铁火车站,进站口。 乘客好比A ,进站口好比B,是有限的,就需要一个队列来排队,这样不管人多少,就不会影响到乘客进站以后的坐车。

市面上一些知名的消息队列

  • RabbitMQ
  • Kafka
  • RocketMQ
  • ActiveMQ

需求分析

核心概念1
  1. 生产者(Producer)
  2. 消费者(Consumer)
  3. 中间人(Broker)
  4. 发布(Push) 生产者向中间人这里投递消息的过程
  5. 订阅(Subscribe) 哪些消费者要从中间人取数据,这个注册的过程,称为 “订阅”
  6. 消费 (Consume) 消费者从中间人这里取数据的动作

一个生产者,一个消费者

image.png

N个生产者,N个消费者

image.png

核心概念2

Broker server 内部也涉及一些关键概念(是为了如何进出队列)

  • 虚拟主机(Virtual Host),类似于 MySQL 中的 database,算是一个 “逻辑” 上的数据集合。
    • 一个Broker server 上可以组织多种不同类别数据,可以使用 Virtual Host 做出逻辑上的区分
    • 实际开发中,一个 Broker server也可能同时用来管理多个 业务线上的数据,就可以使用 Virtual Host 做出逻辑上的区分。
  • 交换机(Exchange)
    • 生产者把消息投递给 Broker Server,实际上是把消息先交给了 (公司某一层楼)Broker Server 上的交换机,再由交换机把消息交给对应的队列。 (交换机类似于“前台小姐姐”)
  • 队列(Queue)
    • 真正用来存储处理消息的实体,后续消费者也是从对应的队列中取数据
    • 一个大的消息队列中,可以有很多具体的小队列
  • 绑定(Binding)
    • 把交换机和队列之间,建立关系。
    • 可以把 交换机 和 队列 视为,数据库中 多对多的关系。可以想象,在 MQ 中,也是有一个这样的中间表,所谓的 “绑定’其实就是中间表中的一项
  • 消息(Message)
    • 具体来说,是 服务器A 发给 B 的请求(通过MQ转发), 服务器B 给 服务器A返回的响应(通过MQ转发)
    • 一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义

image.png

核心API

消息队列服务器(Broker Server),要提供的核心API

  • 创建队列(queueDeclare)
    • 此处不用 Create这样的术语,原因是Create仅仅是创建;而 Declare 起到的效果是,不存在则创建,存在就啥也不做
  • 销毁队列(queueDelete)
  • 创建交换机(exchangeDeclare)
  • 销毁交换机(exchageDelete)
  • 创建绑定(queueBind)
  • 解除绑定(queueUnbind)
  • 发布消息(basicPublish)
  • 订阅消息(basicConsume)
  • 确认消息(basicAck)
    • 这个API起到的效果,是可以让消费者显式的告诉 broker server,这个消息我处理完毕了,提高整个系统的可靠性~保证消息处理没有遗漏
    • RabbitMQ 提供了 肯定 和 否定的 确认,此处我们项目就只有 肯定确认
交换机类型

交换机在转发消息的时候,有一套转发规则的~
提供了几种不同的 交换机类型 (ExchangType)来描述这里不同的转发规则
Rabbit主要实现了四种交换机类型(也是由 AMQP协议定义的)

  • Direct 直接交换机
  • Fanout 扇出交换机
  • Topic 主题交换机
  • Header 消息头交换机

项目中实现了前三种

  1. Direct 直接交换机
    1. 生产者发送消息时,会指定一个"目标队列"的名字(此时的 routingKey就是 队列的名字)
    2. 交换机收到后,就看看绑定的队列里面,有没有匹配的队列
    3. 如果有,就转发过去(把消息塞进对应的队列中)
    4. 如果没有,消息直接丢弃image.png
  2. Fanout 扇出交换机
    1. 会把消息放到交换机绑定的每个队列
    2. 只要和这个交换机绑定任何队列都会转发消息image.png
  3. Topic 主题交换机

有两个关键概念

  1. bindingKey:把队列和交换机绑定的时候,指定一个单词(像是一个暗号一样)
  2. routingKey:生产者发送消息的时候,也指定一个单词
  3. 如果当前 bindingKey 和 routingKey 对上了,就可以把消息转发到对应的队列image.png
  4. 上述三种交换机类型,就像QQ群发红包
  • 专属红包 ======== 直接交换机
  • 发个10块钱红包,大家都能领 10块钱红包 ======== 扇出交换机
  • 我发个口令红包,只有输入对应口令才能领导红包 ======== 主题交换机
持久化

上述 虚拟机、交换机、队列、绑定、消息,需要存储起来。此时内存和硬盘各存储一份,内存为主,硬盘为辅。

  • 交换机、队列、绑定:存储在数据库中
  • 消息:存储在文件中

在内存中存储的原因:
对于 MQ 来说,能够高效的转发处理数据,是非常关键的指标! 因此对于使用内存来组织数据,得到的效率,就比放硬盘要高很多
在硬盘中存储原因:
为了防止内存中数据随着进程重启/主机重启而丢失

网络通信

其他的服务器(生产者/消费者)通过网络,和咱们的 Broker Server 进行交互的。
此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作

应用层协议主要工作:就是让客户端可以通过网络,调用 brokerserver 提供的编程接口
image.png
因此,客户端这边也要提供上述API,只有服务器是真正干实事的;客户端只是发送/接受响应
image.png
虽然调用的客户端的方法,但是实际上好像调用了一个远端服务器的方法一样 (远程调用 RPC)

客户端除了提供上述9个方法之外,还需要提供 4个 额外的方法,支撑其他工作

    1. 创建 Connection
    1. 关闭 Connection
    • 此处用的 TCP 连接,一个 Connection 对象,就代表一个 TCP连接
    1. 创建 Channel
    • 一个Connection 里面包含多个 Channel,每个 Channel 上传输的数据都是互不相干的
    • TCP中,建立/断开一个连接,成本挺高的,因此很多时候不希望频繁建立断开 TCP 连接
    • 所以定义一个 Channel ,不用的时候,销毁 Channel,此处 Channel 是逻辑概念,比 TCP 轻量很多
    1. 关闭 Channel
消息应答模式
  1. 自动应答,消费者把这个消息取走了,就算应答了
  2. 手动应答,basicAck 方法属于手动应答(消费者需要主动调用这个 API 来进行应答)
总结

需要做哪些工作?

  1. 需要实现 生产者,消费者,brokerserver 三个部分
  2. 针对生产者消费者来说,主要编写的是 客户端和服务器的通信部分,给客户端提供一组 api,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法
    1. 比如创建交换机,客户端这边只需要提供相关参数即可,然后通过 socket 将 request 传入到网卡中,然后服务器从 网卡中读取 request 解析。然后计算请求得到 response,再通过 socket 写回去网卡。
  3. 实现 brokerserver 【重点】image.png
  4. 持久化

上述的这些关键数据,在硬盘中怎么存储,啥格式存储,存储在数据库还是文件?
后续服务器重启了,如何读取这些数据,把内存中内容恢复过来?

模块划分

点击查看【processon】

创建核心类

Exchange

image.png

MSGQueue

image.png

Binding

image.png

Message

image.png

数据库操作

建表操作

此处考虑的是更轻量的数据库SQLite, 因为一个完整的 SQLite 数据库,只有一个单独的可执行文件(不到1M)

  1. 直接在pom.xml文件中引入
        <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>3.42.0.0</version>
        </dependency>
  1. 然后在 application.yml配置文件中
spring:
  datasource:
    url: jdbc:sqlite:./data/meta.db
    username:
    password:
    driver-class-name: org.sqlite.JDBC

上述依赖和配置都弄完后,当程序启动时,会自动建立数据库。所以我们只需要建表就行。
此处我们根据之前的需求分析,建立三张表,此处我们通过 代码形式来建造三张表

  1. 配置application.yml
mybatis:
  mapper-locations: classpath:mapper/**Mapper.xml
  1. 创建一个对应的 interface

image.png

  1. 创建 mapper目录和文件 MetaMapper.xml

image.png

交换机操作
  1. 在接口先写方法
void insertExchange(Exchange exchange);
List<Exchange> selectAllExchanges();
void deleteExchange(String exchangeName);
  1. 在 xml 中写
<insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange">
  insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});
</insert>

<select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange">
  select * from exchange;
</select>

<delete id="deleteExchange" parameterType="java.lang.String">
  delete from exchange where name = #{exchangeName};
</delete>

队列操作
  1. 在接口中先写方法
void insertQueue(MSGQueue queue);
List<MSGQueue> selectAllQueues();
void deleteQueue(String queueName);
  1. 在xml中写
<insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue">
  insert into queue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});
</insert>

<select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue">
  select * from queue;
</select>

<delete id="deleteQueue" parameterType="java.lang.String">
  delete from queue where name = #{queueName};
</delete>

绑定操作
  1. 在接口中先写方法
void insertBinding(Binding binding);
List<Binding> selectAllBindings();
void deleteBinding(Binding binding);
  1. 在xml中写
<insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding">
  insert into binding values (#{exchangeName},#{queueName},#{bindingKey});
</insert>

<select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding">
  select * from binding;
</select>

<delete id="deleteBinding" parameterType="java.lang.String">
  delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
</delete>

一个统一的类进行数据库操作

在服务器(BrokerServer)启动的时候,能够做出以下逻辑判定:

    1. 如果数据库存在,表也都有了,不做任何操作
    1. 如果数据库不存在,则创建库,创建表,构造默认数据

构造一个类 DataBaseManager
image.png

package com.example.mq.mqserver.datacenter;
import com.example.mq.MqApplication;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.mapper.MetaMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.lang.reflect.Field;
import java.util.List;

/**
 * 通过这个类,来整合数据库操作
 */
public class DataBaseManager {
    private MetaMapper metaMapper;
    // 针对数据库进行初始化
    public void init(){
        // 要做的是从 Spring 获取到现成的对象
        metaMapper = MqApplication.context.getBean(MetaMapper.class);
        if(!checkDBExists()){
            // 数据库不存在,就进行建库建表操作
            // 先创建一个 data 目录
            File dataDir = new File("./data");
            dataDir.mkdirs();
            // 创建数据表
            createTable();
            // 插入默认数据
            createDefaultData();
            System.out.println("[DataBaseManager] 数据库初始化完成!");
        }else {
            // 数据库已经存在,则什么都不做
            System.out.println("[DataBaseManager] 数据库已经存在!");
        }
    }
    public void deleteDB(){
        File file = new File("./data/meta.db");
        boolean ret = file.delete();
        if (ret){
            System.out.println("[DataBaseManager] 删除数据库文件成功!");
        }else {
            System.out.println("[DataBaseManager] 删除数据库文件失败!");
        }
        File dataDir = new File("./data");
        ret = dataDir.delete();
        if (ret){
            System.out.println("[DataBaseManager] 删除数据库目录成功!");
        }else {
            System.out.println("[DataBaseManager] 删除数据库目录失败!");
        }
    }
    private boolean checkDBExists() {
        File file = new File("./data/meta.db");
        if (file.exists()){
            return true;
        }
        return false;
    }
    // 这个方法用来建表
    // 建库操作并不需要手动执行(不需要手动创建 meta.db 文件)
    // 首次执行这里的数据库操作的时候,就会自动创建 meta.db 文件 (mybatis 帮我们完成的)
    private void createTable() {
        metaMapper.createExchangeTable();
        metaMapper.createQueueTable();
        metaMapper.createBindingTable();
        System.out.println("[DataBaseManager] 创建表完成!");
    }

    // 给数据库表中,添加默认的值
    // 此处主要是添加一个默认的交换机
    // RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机,类型是 DIRECT
    private void createDefaultData() {
        // 构造一个默认交换机
        Exchange exchange = new Exchange();
        exchange.setName("");
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        metaMapper.insertExchange(exchange);
        System.out.println("[DataBaseManager] 创建初始数据完成");
    }
    // 把其他的数据库操作,也在这个类封装下
    public void insertExchange(Exchange exchange){
        metaMapper.insertExchange(exchange);
    }
    public List<Exchange> selectAllExchanges(){
        return metaMapper.selectAllExchanges();
    }
    public void deleteExchange(String exchangeName){
        metaMapper.deleteExchange(exchangeName);
    }
    public void insertQueue(MSGQueue queue){
        metaMapper.insertQueue(queue);
    }
    public List<MSGQueue> selectAllQueues(){
        return metaMapper.selectAllQueues();
    }
    public void deleteQueue(String queueName){
        metaMapper.deleteQueue(queueName);
    }
    public void insertBinding(Binding binding){
        metaMapper.insertBinding(binding);
    }
    public List<Binding> selectAllBindings(){
        return metaMapper.selectAllBindings();
    }
    public void deleteBinding(Binding binding){
        metaMapper.deleteBinding(binding);
    }
}

消息持久化

消息存储格式

Message,如何在硬盘上存储?

  1. 消息操作并不涉及到复杂的增删改查
  2. 消息数量可能会非常多,数据库的访问效率并不高

所以要把消息直接存储在文件中
以下设定消息具体如何在文件中存储~

消息是依托于队列的,因此存储的时候,就要把 消息 按照 队列 维度展开
此处已经有了一个 data 目录(meta.db就在这个目录中)
在 data 中创建一些子目录,每个队列对应一个子目录,子目录名就是队列名

image.png

queue_data.txt:这个文件里面存储的是二进制的数据,我们约定转发到这个队列的队列所有消息都是以二进制的方式进行存储
image.png
首先规定前4个字节代表的该消息的长度,后面紧跟着的是消息本体。
对于BrokerServer来说,消息是需要新增和删除的。
生产者生产一个消息,就是新增一个消息
消费者消费一个消息,就是删除一个消息
对于内存中的消息新增删除就比较容易了:使用一些集合类就行
对于文件中新增:
我们采用追加方式,直接在当前文件末尾新增就行
对于文件中删除:
如果采用真正的删除,效率就会非常低。将文件视为顺序表结构,删除就会涉及到一系列的元素搬运。
所以我们采用逻辑删除的方式。根据消息中的一个变量 isValid 判断该消息是否有效,1 为有效消息;0 为
无效消息

那么如何找到每个消息对应在文件中的位置呢? 我们之前在 Message 中设置了两个变量,一个是 offsetBeg,一个是 offsetEnd。
我们存储消息的时候,是同时在内存中存一份和硬盘中存一份。而内存中存到那一份消息,记录了当前的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息,再根据该消息的两个变量值,就能找到硬盘中的消息数据了。

image.png

垃圾回收

随着时间的推移,文件中存放的消息可能会越来越多。并且可能很多消息都是无用的,所以就要针对当前消息数据文件进行垃圾回收。

此处我们采用的复制算法,原理也是比较容易理解的 (复制算法:比较适用的前提是,当前的空间,有效数据不多,大多数都是无效的数据)
直接遍历原有的消息数据文件,把所有的有效数据数据重新拷贝一份到新的文件中,新文件名字和原来文件名字相同,再把旧的文件直接删除掉。

image.png
image.png

那么垃圾回收的算法有了,何时触发垃圾回收?

此处就要用到我们每个队列目录中,所对应的另一个文件 queue_stat.txt了,使用这个文件来保存消息的统计信息
只存一行数据,用 \t 分割, 左边是 queue_data.txt 中消息的总数目,右边是 queue_data.txt中有效的消息数目。 形如 2000\t1500, 代表该队列总共有2000条消息,其中有效消息为1500条
所以此处我们就约定,当消息总数超过2000条,并且有效消息数目低于总消息数的50%,就处罚一次垃圾回收GC

如果当一个文件消息数目非常的多,而且都是有效信息,此时会导致整个消息的数据文件非常庞大,后续针对这个文件操作就会非常耗时。假设当前文件已经达到10个G了,那么此时如果触发一次GC,整个耗时就会非常高。

对于RabbitMQ来说,解决方案:
文件拆分:当某个文件长度达到一定的阈值的时候,就会拆分成两个文件(拆着拆着就成了很多文件)
文件合并:每个单独的文件都会进行GC,如果GC之后,发现文件变小了,就会和相邻的其他文件合并
这样做,可以保证在消息特别多的时候,也能保证性能上的及时响应
实现思路:

  1. 用一个专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息的数目是多少,无效消息是多少
  2. 设计策略:什么时候触发文件拆分,什么时候触发文件合并
统计文件读写

需要定义一个内部类,在表示该队列的统计消息,此处优先考虑 static 静态内部类

static public class Stat {
    // 此处直接定义成 public
    public int totalCount;  // 总的消息数
    public int validCount; // 有效消息数
}
  1. 统计文件的读
private Stat readStat(String queueName) {
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {
    Scanner scanner = new Scanner(inputStream);
    stat.totalCount = scanner.nextInt();
    stat.validCount = scanner.nextInt();
    return stat;
} catch (IOException e) {
    e.printStackTrace();
}
return null;
}
  1. 统计文件的写
private void writeStat(String queueName, Stat stat) {
    // 使用 PrintWrite 来写文件
    // OutputStream 打开文件,默认情况下,会直接把源文件清空,此时就相当于 新数据把旧的数据覆盖了
    // 加个 参数 true,就会变成追加 new FileOutputStream(getQueueStatPath(queueName),true)
    try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {
        PrintWriter printWriter = new PrintWriter(outputStream);
        printWriter.write(stat.totalCount + "\t" + stat.validCount);
        printWriter.flush();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

创建消息目录和文件
  1. 先创建队列对应的目录(以队列名字为名的目录)
  2. 创建队列里面的消息数据文件
  3. 创建队列里面的消息统计数据文件
  4. 给消息统计文件设置初始值
// 创建队列对应的文件目录
public void createQueueFiles(String queueName) throws IOException {
    // 1. 先创建队列对应的消息目录
    File baseDir = new File(getQueueDir(queueName));
    if (!baseDir.exists()) {
        // 不存在就创建这个目录
        Boolean ok = baseDir.mkdirs();
        if (!ok) {
            throw new IOException("创建目录失败!baseDir=" + baseDir.getAbsolutePath());
        }
    }
    // 2. 创建队列数据文件
    File queueDataFile = new File(getQueueDataPath(queueName));
    if (!queueDataFile.exists()) {
        Boolean ok = queueDataFile.createNewFile();
        if (!ok) {
            throw new IOException("创建文件失败! queueDateFile=" + queueDataFile.getAbsolutePath());
        }
    }
    // 3. 创建消息统计文件
    File queueStatFile = new File(getQueueStatPath(queueName));
    if (!queueStatFile.exists()) {
        Boolean ok = queueStatFile.createNewFile();
        if (!ok) {
            throw new IOException("创建统计文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
        }
    }

    // 4. 给消息统计文件,设置初始值
    Stat stat = new Stat();
    stat.totalCount = 0;
    stat.validCount = 0;
    writeStat(queueName, stat);
}

删除消息目录和文件
  1. 先删除消息的统计文件和消息数据文件
  2. 再删除队列目录
// 删除队列的目录和文件
// 队列也是可以被删除的,当队列删除后,对应的目录文件,也需要随之删除
public void destroyQueueFiles(String queueName) throws IOException {
    File queueStatFile = new File(getQueueStatPath(queueName));
    boolean ok1 = queueStatFile.delete();
    File queueDataFile = new File(getQueueDataPath(queueName));
    boolean ok2 = queueDataFile.delete();
    File baseDir = new File(getQueueDir(queueName));
    boolean ok3 = baseDir.delete();
    if (!ok1 || !ok2 || !ok3) {
        throw new IOException("删除目录和文件失败!baseDir=" + baseDir.getAbsolutePath());
    }
}

消息序列化

把一个对象(结构化数据)转换成一个 字符串/字节数组
序列化之后方便 存储和传输

  • 存储:一般存储在文件中,文件只能存字符串/二进制数据。不能直接存对象
  • 传输:在网络中传输,socket

此处不使用 json 进行序列化,由于 Message,里面存储是二进制数据。
而jason序列化得到的结果是文本数据,里面无法存储二进制的body
image.png

针对序列化,有很多解决方案

  1. Java标准库提供了序列化方案。 ObjectInputStream 和 ObjectOutputStream
  2. Hessian 也是一个解决方案
  3. protobuffer
  4. thrift

此处咱使用第一种 Java 标准库自带的

  • 序列化
// 把一个对象序列化为字节数组
public static byte[] toBytes(Object object) throws IOException {

    // 这个流对象相当于一个变长字节数组
    // 就可以把 object 序列化的数据给逐渐写入到 byteArrayOutputStream 中,再统一转成 byte[]
    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
        // ObjectOutputStream(byteArrayOutputStream)) 此处括号里的内容,可根据实际需求修改,如果需要 关联文件就写到文件里面
        // 如果关联 网络就写到网络,此处写入的是内存中的 字节数组
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
            // 此处的 writeObject 就会把该对象进行序列化,生成二进制数据,就会写入到
            // objectOutputStream 中
            // 由于 objectOutputStream 又是关联到了 byteArrayOutputStream,最终结果就会写入到 byteArrayOutputStream
            objectOutputStream.writeObject(object);
        }
        // 这个操作就是把 byteArrayOutputStream 二进制数据取出来 转换成 byte[]
        return byteArrayOutputStream.toByteArray();
    }
}
  • 反序列化
// 把一个字节数组反序列化成对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
    Object object = null;
    try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
        try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
            // 此处的 readObject 就是从 data 这个 byte[] 中读取数据并进行反序列化
            object = objectInputStream.readObject();
        }
    }
    return object;
}

把消息写入到文件中

这个要将消息存入到该队列对应的文件中。
需要注意的是:此处 写入消息 需要两个参数,一个是 队列 MSGQueue,一个是消息 Message

  1. 先判断当前写入队列的文件在不在
  2. 把 Message 对象进行序列化,转换成二进制的字节数组
  3. 进行写入操作的操作时候要进行加锁(锁对象就是当前 MSGQueue),此处如果不加锁。当多个客户端进行发送消息的时候,可能会造成数据不对。
  4. 先获取当前队列消息数据文件的长度,用这个长度来计算 offsetBeg 和 offsetEnd
    1. 设置该消息 offsetBeg = 当前文件长度 + 4
    2. 设置该消息 offsetEnd = 当前文件长度 + 4 + 当前二进制数组长度
  5. 把新的 message数据,写入到文件的末尾处,采用追加方式
    1. 先写入4个字节的消息长度
    2. 再写入消息本体
  6. 更新统计文件,并重新写入
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
    // 1. 检查下当前要写入的队列 对应的文件是否存在
    if (!checkFilesExists(queue.getName())) {
        throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName=" + queue.getName());
    }
    // 2. 把对象进行序列化,转换成二进制的字节数组
    byte[] messageBinary = BinaryTool.toBytes(message);

    synchronized (queue) {
        // 3. 先获取到当前队列数据文件的长度,用这个长度来计算该 Message 对象和 offsetBeg offsetEnd
        // 把新的 message 数据,写入到队列的文件末尾,
        // 此时,message 对象的 offsetBeg 就是 当前文件长度+4
        // offsetEnd 就是 当前文件长度 + 4 + message自身长度
        File queueDataFile = new File(getQueueDataPath(queue.getName()));
        // 通过这个方法 queueDataFile.length() 就能获取到长度,单位字节
        message.setOffsetBeg(queueDataFile.length() + 4);
        message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);
        // 4. 写入消息数据到文件,注意,此处是追加
        try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
            try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                // 接下来首先写入的是当前消息的长度,占领4个字节
                dataOutputStream.writeInt(messageBinary.length);
                // 写入消息本体
                dataOutputStream.write(messageBinary);
                // TODO
            }
        }
        // 5. 更新统计文件
        Stat stat = readStat(queue.getName());
        stat.totalCount += 1;
        stat.validCount += 1;
        // 重新写入
        writeStat(queue.getName(), stat);
    }
}

从文件中删除消息(逻辑删除)
  1. 先从硬盘中读取出来
    1. 此处采用 RamdomAccessFile 来读取(可以在文中指定位置,进行读写,随机访问)
    2. 先定义一个 以消息长度为length【offsetEnd - offsetBeg】的一个字节数组 bufferSrc
    3. 再根据要删除的 Message 对象中的 offsetBeg 和 offsetEnd 将光标定位那个位置
    4. 然后将结果读取到 bufferSrc中
  2. 然后将读到的bufferSrc数据反序列化成 Message对象,修改变量 isValid=0x2
  3. 再将 Message对象 序列化成 bufferDes
    1. 重新定位光标到消息的 offserBeg
    2. 将 bufferDes 写回去
  4. 更新统计文件信息,写入
// 这是消息删除方法
// 这里删除是逻辑删除,也就是把硬盘上的message对象里面的 isValid,设置成0
// 1. 先把文件从硬盘中读取出来
// 2. 然后修改 isValid
// 3. 再写回到硬盘中
// 此处这个参数中的 message对象,必须包含有效的 offsetBeg 和 offsetEnd
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
    synchronized (queue) {
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
            // 1. 先读取对应 数据
            byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
            randomAccessFile.seek(message.getOffsetBeg());
            randomAccessFile.read(bufferSrc);
            // 2. 读取当前的二进制数据,转换成 Message 对象 并修改 isValid
            Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
            // 此处不需要给参数的 message 的 isValid 设置成0,因为这个参数是在内存中管理的 message对象,
            // 而这个对象 也要被马上删除了
            diskMessage.setIsValid((byte) 0x0);
            // 3. 写回去
            // 需要重新定位光标
            byte[] bufferDest = BinaryTool.toBytes(diskMessage);
            randomAccessFile.seek(message.getOffsetBeg());
            randomAccessFile.write(bufferDest);
        }

        // 还要更换统计文件
        Stat stat = readStat(queue.getName());
        if (stat.validCount > 0) {
            stat.validCount -= 1;
        }
        writeStat(queue.getName(), stat);
    }
}

从硬盘中恢复数据到内存

使用这个方法将硬盘中所有的有效数据加载到内中(具体来说是一个链表中)这个方法是在程序启动的时候调用。
这里使用 LinkedList来存储消息,方便后续进行头删操作
一个文件中会包含多个消息,需要循环去读取,此处手动记录光标位置

  1. 先读取4个字节,表示当前消息长度
  2. 然后根据当前消息长度,读取对应的长度到 buffer 字节数组中
  3. 把读取到 buffer 字节数据 反序列化成 Message 对象
  4. 判断这个 Message 对象里面的 isValid 是否为 0x1
  5. 如果不是,就 continue,是的话执行第六步,不是就从第一步开始
  6. 加入消息之前先设置 offsetBeg, offserEnd,然后将消息加入到 LinkedList中
  7. 如果读到末尾会有异常 EOF,会自动结束
// 使用这个方法,从文件中,读取所有的消息内容,加载到内存中(具体来说是一个链表中)
// 这个方法,准备在程序启动的时候,进行调用
// 这里 使用一个 LinkedList,主要目的是为了后续进行头删操作
// 这个方法的参数,只是一个 queueName,而不是 MsgQueue对象,因为不需要使用加锁
// 不涉及多线程操作
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
    LinkedList<Message> messages = new LinkedList<>();

    try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {
        try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {
            // 手动记录光标的位置
            long currentOffset = 0;
            // 一个文件可能包含多个消息,所以要循环读
            while (true) {
                // 1. 读取当前消息长度  , 一次读4个字节  (这里的 readInt 可能会读到文件的末尾)
                // 读到末尾就会抛出 EOFException 异常
                int messageSize = dataInputStream.readInt();
                // 2. 按照这个长度,读取消息内容
                byte[] buffer = new byte[messageSize];
                int actualSize = dataInputStream.read(buffer);
                if (messageSize != actualSize) {
                    throw new MqException("[MessageFileManager] 文件格式错误!queueName=" + queueName);
                }
                // 3. 把读到的二进制数据,反序列化为 Message 对象
                Message message = (Message) BinaryTool.fromBytes(buffer);
                // 4. 判定这个消息,是不是无效对象 isValid=0x2
                if (message.getIsValid() != 0x1) {
                    // 无效数据,跳过
                    continue;
                }
                // 5. 有效数据,则需要把这个 Message 对象加入到链表中,加入之前要先设置 offsetBeg, offsetEnd
                message.setOffsetBeg(currentOffset + 4);
                message.setOffsetEnd(currentOffset + 4 + messageSize);
                currentOffset += (4 + messageSize);
                messages.add(message);
            }
        } catch (EOFException e) {
            // 这个并非真是处理异常,处理正常业务逻辑
            // 文件读到末尾
            System.out.println("[MessageFileManager] 从硬盘恢复数据到内存完成!");
        }
    }
    return messages;
}

消息文件垃圾回收

由于当前会不停的往消息文件中写入消息,并且删除消息只是逻辑删除,这就可能导致消息文件越来越大,并且包含大量无用的消息。
此处使用的是复制算法。

  • 判定当前文件中消息总数超过2000,并且有效消息数不足50%,就会触发垃圾回收
  • 就把所有的有效消息提取出来,单独的在写到一个文件中,
  • 删除旧文件,使用新文件代替
  • 注意:还要更新统计文件信息
总结

MessageFileManager主要负责管理消息在文件中的存储~

  1. 设计目录结构和文件格式
  2. 实现了目录创建和删除
  3. 实现统计文件的读写
  4. 实现了消息的写入(按照之前的文件格式)
  5. 实现了消息的删除 (随机访问文件)
  6. 实现了所有消息的加载
  7. 垃圾回收(复制算法)

统一硬盘存储管理

上述我们存储在硬盘中的数据,分为了两个,一个是存放数据库中,一个是存放在文件中。
我们需要统一封装一个类对上面硬盘数据进行管理

package com.example.mq.mqserver.datacenter;

import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

/**
 * 使用这个类来管理所有硬盘上得数据
 * 1. 数据库:交换机、绑定、队列
 * 2. 数据文件:消息
 * 上层逻辑如果需要操作硬盘,统一都通过这个类来使用。(上层代码不关心当前数据是存储在数据库还是文件中)
 */
public class DiskDataCenter {
    private DataBaseManager dataBaseManager = new DataBaseManager();
    private MessageFileManager messageFileManager = new MessageFileManager();

    public void init(){
        // 针对上述两个实例进行初始化, 建库建表,创建默认交换机
        dataBaseManager.init();
        // 当前这个方法是空的,方便以后扩展
        messageFileManager.init();
    }

    // 封装交换机操作
    public void insertExchange(Exchange exchange){
        dataBaseManager.insertExchange(exchange);
    }

    public void deleteExchange(String exchangeName){
        dataBaseManager.deleteExchange(exchangeName);
    }

    public List<Exchange> selectAllExchanges(){
        return dataBaseManager.selectAllExchanges();
    }

    // 封装队列操作
    public void insertQueue(MSGQueue queue) throws IOException {
        dataBaseManager.insertQueue(queue);
        // 创建队列的同时,不仅仅要把队列对象写到数据库中,还需要创建出对应的目录和文件
        messageFileManager.createQueueFiles(queue.getName());
    }

    public void deleteQueue(String queueName) throws IOException {
        dataBaseManager.deleteQueue(queueName);
        // 删除队列的时候,也要同时删除队列对应的目录和文件
        messageFileManager.destroyQueueFiles(queueName);
    }

    public List<MSGQueue> selectAllQueues(){
        return dataBaseManager.selectAllQueues();
    }

    // 封装绑定操作
    public void insertBinding(Binding binding){
        dataBaseManager.insertBinding(binding);
    }

    public void deleteBinding(Binding binding){
        dataBaseManager.deleteBinding(binding);
    }

    public List<Binding> selectAllBindings(){
        return dataBaseManager.selectAllBindings();
    }

    // 封装消息的操作
    public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
        messageFileManager.sendMessage(queue,message);
    }

    public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {
        messageFileManager.deleteMessage(queue,message);
        if (messageFileManager.checkGC(queue.getName())){
            messageFileManager.GC(queue);
        }
    }
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        return messageFileManager.loadAllMessageFromQueue(queueName);
    }
}

内存数据管理

设计数据结构

使用内存管理上述的数据,对于MQ来说,内存存储数据为主;硬盘存储数据为辅(主要是为了持久化,重启之后,数据不丢失)

  • 交换机:Key:交换机名字;Value:交换机
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
  • 队列: Key:队列名称; Value:队列
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
  • 绑定:Key:交换机名字 ;Value.key:队列名字;Value.value:绑定关系
private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
  • 消息:Key:MessageId;Value:Message对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
  • 表示队列和消息之间的关联:Key:队列名字;Value: 是一个存储消息的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
  • 表示未被确认的消息:存储了哪些消息被消费者取走,但是还没应答
  • key:队列名称;Value.key:MessageId;Value.value:Message对象
private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
  • 此处咱们实现的MQ,支持两种应答模式(ACK)
    1. 自动应答:消费者取走元素,这个消息就是被应答了,就需要删除
    1. 手动应答:消费者取走元素,还不算应答,需要消费者再主动调用一个 basicAck 方法,此时才算真正应答了,才可以删除消息
实现交换机的管理
  1. 添加交换机
public void insertExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交换机添加成功!exchangeName=" + exchange.getName());
}

  1. 获取交换机
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}

  1. 删除交换机
public void deletaExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交换机删除成功!exchangeName=" + exchangeName);
}

实现队列的管理
  1. 添加队列
public void insertQueue(MSGQueue queue) {
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter] 新队列添加成功!queueName=" + queue.getName());
}

  1. 获取队列
public MSGQueue getQueue(String queueName) {
return queueMap.get(queueName);
}

  1. 删除队列
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功!queueName=" + queueName);
}

实现绑定的管理
  1. 添加绑定
public void insertBinding(Binding binding) throws MqException {
    // 先使用 exchangeName 查一下,对应的 HashMap 是否存在,不存在就创建
    ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                                                                                k -> new ConcurrentHashMap<>());
    // 再根据 queueName 查一下,如果 binding 存在,就抛出异常,不存在才能插入
    synchronized (bindingMap) {
        if (bindingMap.get(binding.getQueueName()) != null) {
            throw new MqException("[MemoryDataCenter] 绑定已经存在!exchangeName=" + binding.getExchangeName() +
                                  ", queueName=" + binding.getQueueName());
        }
        bindingMap.put(binding.getQueueName(), binding);
    }
    System.out.println("[MemoryDataCenter] 新绑定添加成功!exchangeName=" + binding.getExchangeName()
                       + ", queueName=" + binding.getQueueName());
}

添加绑定要注意线程安全问题,此处需要以当前的 bindMap 为锁对象进行加锁!

  1. 获取绑定

根据交换机名字和队列名字获取唯一的绑定

public Binding getBinding(String exchangeName, String queueName) {
    ConcurrentHashMap<String, Binding> bindMap = bindingsMap.get(exchangeName);
    if (bindMap == null) {
        return null;
    }
    return bindMap.get(queueName);
}

获取一个交换机的所有绑定

public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
return bindingsMap.get(exchangeName);
}

  1. 删除绑定
public void deleteBinding(Binding binding) throws MqException {
    ConcurrentHashMap<String, Binding> bindMap = bindingsMap.get(binding.getExchangeName());
    if (bindMap == null) {
        // 该交换机没有绑定任何队列
        throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName=" + binding.getExchangeName() + ", queueName"
                              + binding.getQueueName());
    }
    bindMap.remove(binding.getQueueName());
    System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName=" + binding.getExchangeName()
                       + ", queueName=" + binding.getQueueName());
}

实现消息的管理
  1. 添加消息
public void addMessage(Message message) {
messageMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功!messageId=" + message.getMessageId());
}

  1. 根据 id 获取消息
public Message getMessage(String messageId) {
return messageMap.get(messageId);
}

  1. 根据 id 删除消息
public void removeMessage(String messageId) {
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息删除成功!messageId=" + messageId);
}

  1. 发送消息到指定队列 (队列和消息之间的关联)
public void sendMessage(MSGQueue queue, Message message) {
    // 把消息放到对应的队列中
    // 先根据队列名字,找到该队列对应的消息链表
    LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>()
                                                                  );
    synchronized (messages) {
        messages.add(message);
    }
    // 在这里把该消息也往消息中心插入一下
    // 这里就算消息中心已经存在消息,重复插入也没关系
    // messageId相同,对应的 message 的内容也一定是一样的(服务器代码不会对 Message 内容做出修改 basicProperties 和 body)
    addMessage(message);
    System.out.println("[MemoryDataCenter] 消息被投递到队列中!messageID=" + message.getMessageId() + ", queueName=" +
                       queue.getName());
}

此处发送消息到指定队列需要进行加锁操作,防止重复在该队列中插入消息

  1. 从队列中获取指定消息
public Message pollMessage(String queueName) {
// 根据队列名,查找一下,对应的队列的消息链表
LinkedList<Message> messages = queueMessageMap.get(queueName);
// 如果没找到说明,队列中没有任何消息
if (messages == null) {
    return null;
}
synchronized (messages) {
    if (messages.size() == 0) {
        return null;
    }
    // 链表中有元素,就进行头删除
    Message curMessage = messages.remove(0);
    System.out.println("[MemoryDataCenter] 消息从队列中取出!messageId=" + curMessage.getMessageId());
    return curMessage;
}
}

此处需要进行加锁操作,两个线程同时获取的时候破坏链表结构

  1. 获取指定队列中的消息个数
public int getMessageCount(String queueName) {
LinkedList<Message> messages = queueMessageMap.get(queueName);
if (messages == null) {
    return 0;
}
if (messages.size() == 0){
    return 0;
}
synchronized (messages) {
    System.out.println("messageSize=" + messages.size());
    return messages.size();
}
}


实现待确认消息的管理
  1. 添加未确认的消息
public void addMessageWaitAck(String queueName, Message message) {
    ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,
                                                                                               k -> new ConcurrentHashMap<>());
    messageHashMap.put(message.getMessageId(), message);
    System.out.println("[MemoryDataCenter] 消息进入待确认队列!messageId=" + message.getMessageId());
}

  1. 删除待确认的消息(已经确认过的消息)
// 删除消息(已确认过)
public void removeMessageWaitAck(String queueName, String messageId) {
    ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
    if (messageHashMap == null) {
        return;
    }
    messageHashMap.remove(messageId);
    System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId=" + messageId);
}

  1. 获取到指定的待确认消息
public Message getMessageWaitAck(String queueName, String messageId) {
    ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
    if (messageHashMap == null) {
        return null;
    }
    return messageHashMap.get(messageId);
}

实现数据从硬盘中恢复

从硬盘中读取数据,把硬盘之前持久化存储的各个维度的数据恢复到内存中

  1. 清空之前集合中的数据
  2. 恢复所有的交换机数据
  3. 恢复所有的队列数据
  4. 恢复所有的绑定数据
  5. 恢复所有消息数据

注意:不需要恢复待确认的消息,因为在 当消息在等待 ACK的时候,服务器重启了。此时消息就相当于未被取走状态,而硬盘中存储的就是消息就是“未被取走”的。

总结

借助内存中的一些列数据结构 ,保存 交换机、队列、绑定、消息
广泛使用了 哈希表、链表、嵌套的数据结构等
线程安全:
要不要加锁?锁加到哪里?

虚拟主机的设计

类似于 MySQL 的 database,把交换机,队列,绑定,消息…进行逻辑上的隔离,一个服务器可以有多个虚拟主机~,此处我们项目就设计了一个虚拟主机(VirtualHost)
image.png

创建交换机(exchangeDelcare)

如何表示,交换机和虚拟主机之间的从属关系呢?

  • 方案一:参考数据库设计,“一对多”方案,比如给交换机表,添加个属性,虚拟主机 id/name
  • 方案二:交换机的名字 = 虚拟主机名字 + 交换机的真实名字

按照方案二,也可以去区分不同的队列,进一步由于,绑定和队列和交换机都相关,直接就隔离开了,
再进一步,消息和队列是强相关的,队列名区分开,消息自然区分开。

此时就可以区分不同虚拟主机的不同交换机的关系

  1. 把交换机名字加上虚拟主机名字作为前缀
  2. 判断交换机是否存在,直接通过内存查询
  3. 真正去构造交换机对象
  4. 当参数为 durable的时候,将交换机对象写入硬盘
  5. 将交换机写入内存
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
                               Map<String, Object> arguments) {
    // 把交换机的名字,加上虚拟主机作为前缀
    exchangeName = virtualName + exchangeName;
    try {
        synchronized (exchangeLocker) {
            // 1. 判定该交换机是否存在,直接通过内存查询
            Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
            if (existsExchange != null) {
                // 该交换机已经存在!
                System.out.println("[VirtualHost] 交换机已经存在!exchangeName=" + exchangeName);
                return true;
            }
            // 2. 真正创建交换机,先构造 Exchange 对象
            Exchange exchange = new Exchange();
            exchange.setName(exchangeName);
            exchange.setDurable(durable);
            exchange.setType(exchangeType);
            exchange.setAutoDelete(autoDelete);
            exchange.setArguments(arguments);
            // 3. 把交换机对象写入硬盘, durable为true时才写入
            if (durable) {
                diskDataCenter.insertExchange(exchange);
            }
            // 4. 把交换机写入内存
            memoryDataCenter.insertExchange(exchange);
            System.out.println("[VirtualHost] 交换机创建完成!exchangeName=" + exchangeName);
            // 上述逻辑,先写硬盘,再写内存。
            // 因为硬盘更容易写失败,一旦失败,就不写内存了
            // 要是先写内存,内存写成功了,硬盘写失败了,还需要把内存数据清理了,就比较麻烦
        }
        return true;
    } catch (Exception e) {
        System.out.println(("[VirtualHost] 交换机创建失败!exchangeName=" + exchangeName));
        e.printStackTrace();
        return false;
    }
}

删除交换机(exchangeDelete)
  1. 根据交换机的名字找到对应的交换机
  2. 删除硬盘数据
  3. 删除内存中数据
// 删除交换机
public boolean exchangeDelete(String exchangeName) {
exchangeName = virtualName + exchangeName;
try {
    synchronized (exchangeLocker) {
        // 1. 先找到对应的交换机
        Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
        if (toDelete == null) {
            throw new MqException("[VirtualHost] 交换机不存在!无法删除!exchangeName=" + exchangeName);
        }
        // 2. 删除硬盘数据
        if (toDelete.isDurable()) {
            diskDataCenter.deleteExchange(exchangeName);
        }
        // 3. 从内存中删除
        memoryDataCenter.deletaExchange(exchangeName);
        System.out.println("[VirtualHost] 交换机删除成功!exchangeName=" + exchangeName);
    }
    return true;
} catch (Exception e) {
    System.out.println(("[VirtualHost] 交换机删除失败!exchangeName=" + exchangeName));
    e.printStackTrace();
    return false;
}
}

创建队列(queueDelcare)
  1. 判断队列是否存在
  2. 不存在则创建队列,设定参数
  3. 队列参数 durable 为 true的时候存入硬盘
  4. 将队列写入到内存
// 创建队列
public boolean queueDelcare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) {
    // 把队列的名字,拼接上虚拟主机名字
    queueName = virtualName + queueName;
    try {
        synchronized (queueLocker) {
            // 1. 判定队列是否存在
            MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
            if (existsQueue != null) {
                System.out.println("[VirtualHost] 队列已经存在!queueName=" + queueName);
                return true;
            }
            // 2. 不存在则创建队列
            MSGQueue queue = new MSGQueue();
            queue.setName(queueName);
            queue.setDurable(durable);
            queue.setExclusive(exclusive);
            queue.setAutoDelete(autoDelete);
            queue.setArguments(arguments);
            // 3. 将队列写入到硬盘durable为true时才写入
            if (durable) {
                diskDataCenter.insertQueue(queue);
            }
            // 4. 将队列写入到内存
            memoryDataCenter.insertQueue(queue);
            System.out.println("[VirtualHost] 队列创建成功!queueName=" + queueName);
        }
        return true;
    } catch (Exception e) {
        System.out.println(("[VirtualHost] 队列创建失败!queueName=" + queueName));
        e.printStackTrace();
        return false;
    }
}

删除队列(queueDelete)
  1. 判断队列是否存在
  2. 存在则删除,先在硬盘删除
  3. 在内存中删除
// 队列删除
public boolean queueDelete(String queueName) {
queueName = virtualName + queueName;
try {
    synchronized (queueLocker) {
        // 1. 查询队列是否存在
        MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
        if (existsQueue == null) {
            throw new MqException("[VirtualHost] 队列不存在!无法删除!");
        }
        // 2. 存在进行,先在硬盘删除
        if (existsQueue.isDurable()) {
            diskDataCenter.deleteQueue(queueName);
        }
        // 3. 在内存中删除
        memoryDataCenter.deleteQueue(queueName);
        System.out.println("[VirtualHost] 队列删除成功!queueName=" + queueName);
    }
    return true;
} catch (Exception e) {
    System.out.println(("[VirtualHost] 队列删除失败!queueName=" + queueName));
    e.printStackTrace();
    return false;
}
}

创建绑定(queueBind)
  1. 判断当前绑定在不在
  2. 验证当前的 routingKey 合不合法
  3. 如果合法,就创建绑定,设置参数
  4. 从内存中获取下绑定关系的队列和交换机是否存在
  5. 都存在,再次判定队列和交换机的durable是否都为 true
  6. 都为 true 则存入硬盘
  7. 再写入内存
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {
    queueName = virtualName + queueName;
    exchangeName = virtualName + exchangeName;
    try {   
        synchronized (exchangeLocker) {
            synchronized (queueLocker) {
                // 1. 判断当前的绑定是否已经存在
                Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);
                if (existsBinding != null) {
                    throw new MqException("[VirtualHost] binding已经存在!exchangeName=" + exchangeName + ", queueName=" + queueName);
                }
                // 2. 验证 bindingKey 是否合法
                if (!router.checkBindingKey(bindingKey)) {
                    throw new MqException("[VirtualHost] bindingKey非法!bindingKey=" + bindingKey);
                }
                // 3. 不存在就创建绑定
                Binding binding = new Binding();
                binding.setQueueName(queueName);
                binding.setExchangeName(exchangeName);
                binding.setBindingKey(bindingKey);
                // 4. 获取下绑定 对应的 队列和交换机是否存在
                Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
                MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
                if (existsExchange == null) {
                    throw new MqException("[VirtualHost] 交换机不存在!exchangeName=" + exchangeName);
                }
                if (existsQueue == null) {
                    throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);
                }
                // 5. 先写入硬盘,需要判断当前 交换机和队列是否都持久化
                if (existsQueue.isDurable() && existsExchange.isDurable()) {
                    diskDataCenter.insertBinding(binding);
                }
                // 6. 再写入内存
                memoryDataCenter.insertBinding(binding);
                System.out.println("[VirtualHost]  绑定创建成功! exchangeName=" + exchangeName + ", queueName=" + queueName);
            }
        }
        return true;
    } catch (Exception e) {
        System.out.println("[VirtualHost]  绑定创建失败! exchangeName=" + exchangeName + ", queueName=" + queueName);
        e.printStackTrace();
        return false;
    }
}

删除绑定(queueUnbind)

有个依赖关系问题,就是 如果 线程A 先删除了队列,而此时另一个线程B 再去删除绑定消息时候,
就会失败,因为此时队列已经不存在了,此时需要解决方案

  • 方案一:参考类似于 MySQL 的外键一样,删除交换机/队列的时候,判定一下当前队列/交换机是否存在对应的绑定,如果存在,则禁止删除,要求先解除绑定,再尝试删除
  • 方案二:直接删除,不判断 交换机和队列是否存在
  1. 获取绑定是否存在
  2. 删除硬盘上的数据,需要判断该绑定 durable 是否为 true
  3. 从内存中删除绑定
// 删除绑定
public boolean queueUnbind(String exchangeName, String queueName) {
    exchangeName = virtualName + exchangeName;
    queueName = virtualName + queueName;
    try {
        synchronized (exchangeLocker) {
            synchronized (queueLocker) {
                // 1. 获取绑定是否存在
                Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);
                if (existsBinding == null) {
                    throw new MqException("[VirtualHost] 绑定不存在!无法删除!exchangeName=" + exchangeName +
                                          ", queueName=" + queueName);
                }
                //            // 2. 获取 对应的队列和交换机
                //            Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
                //            MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
                //            if (existsExchange == null) {
                //                throw new MqException("[VirtualHost] 交换机不存在!exchangeName=" + exchangeName);
                //            }
                //            if (existsQueue == null) {
                //                throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);
                //            }
                // 3. 删除硬盘上的数据 需要判断当前 交换机和队列都是持久化   
                diskDataCenter.deleteBinding(existsBinding);

                // 4. 从内存中删除绑定
                memoryDataCenter.deleteBinding(existsBinding);
                System.out.println("[VirtualHost] 删除绑定成功!");
            }
        }
        return true;
    } catch (Exception e) {
        System.out.println("[VirtualHost] 删除绑定失败!");
        return false;
    }
}

注意:考虑线程安全问题

发送消息(basicPublish)

image.png
发送消息的时候,会往 ConsumerManager类中的 阻塞队列中 BlockingQueue tokenQueue
存在该队列名,表示该队列存在消息~

// 发送消息到指定的交换机/队列中
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
    try {
        // 1. 转换交换机名字
        exchangeName = virtualName + exchangeName;
        // 2. 检查 routingKey 是否合法
        if (!router.checkRoutingKey(routingKey)) {
            throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
        }
        // 3. 查找交换机对象
        Exchange exchange = memoryDataCenter.getExchange(exchangeName);
        if (exchange == null) {
            throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
        }
        // 4. 判断交换机类型
        if (exchange.getType() == ExchangeType.DIRECT) {
            // 按照直接交换机的方式来转发消息
            // 以 routingKey 为队列名,直接将 消息写入 队列中
            // 此时可以无视绑定消息
            String queueName = virtualName + routingKey;
            // 5. 构造消息对象
            Message message = Message.createMessageWithId(routingKey, basicProperties, body);
            // 6. 查找该队列名所对应的 队列是否存在
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if (queue == null) {
                throw new MqException("[VirtualHost] 队列不存在!queueName=" + queueName);
            }
            // 7. 队列存在,则直接写入消息
            sendMessage(queue, message);

        } else {
            // 按照 FANOUT 和 TOPIC 方式
            // 5. 找到该交换机关联的所有绑定消息,并遍历这些绑定消息
            ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
            for (Map.Entry<String, Binding> entry : bindings.entrySet()) {
                // 1) 获取绑定对象,判断该队列是否存在
                Binding binding = entry.getValue();
                // 2) 查看当前绑定里面的队列名,能不能查到相应队列 (此处不设计转发规则)
                MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                if (queue == null) {
                    System.out.println("[VirtualHost] basicPublish 发送消息时,发现队列不存在!queueName=" + binding.getQueueName());
                    continue;
                }
                // 2) 构造消息对象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 3) 判断这个消息能否转发给这个队列
                // 如果是 fanout, 所有绑定的队列都要转发的
                // 如果是 topic, 还需要 判断 routingKey 和 BindingKey 是否匹配
                if (!router.route(exchange.getType(), binding, message)) {
                    continue;
                }
                // 4) 真正转发消息给队列
                sendMessage(queue, message);
            }
        }
        return true;
    } catch (Exception e) {
        return false;
    }
}

private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
    // 1. 此处发送消息,就是把消息 写入到 硬盘 和 内存
    int deliverMode = message.getDeliverMode();
    // deliverMode 为2持久化  为1不持久化
    if (deliverMode == 2) {
        diskDataCenter.sendMessage(queue, message);
    }
    // 写入内存
    memoryDataCenter.sendMessage(queue, message);

    //  通知消费者可以消费消息了
    consumerManager.notifyConsume(queue.getName());
}

Topic交换机转发规则
  • bindingKey(创建绑定的时候,给绑定指定的特殊字符串)

      1. 数字、字母、下划线
      1. 使用 . 把整个 routingKey 分成若干个部分 形如:aaa.vvv.eewe
      1. 支持两种特殊符号,作为通配符
      • 一个是 * 形如:aaa.*.bbb (只能作为被 . 分割单独的存在)
      • 一个 # 形如:aaa.#.bbb
  • routingKey (发布消息的时候,给消息上指定字符串)

      1. 数字、字母、下划线
      1. 使用 . 把整个 routingKey 分成若干个部分 形如:aaa.vvv.eew

image.png
上述规则,是根据 AMQP 协议规定的

验证 bindingKey 是否合法(checkBindingKey)

image.png

public boolean checkBindingKey(String bindingKey) {
// bindingKey的构造规则
// 1. 数字、字母、下划线
// 2. 使用 . 进行分割
// 3. 允许存在 * 和 #  作为通配符,但是只能作为独立的存在
if (bindingKey.length() == 0) {
    // 空字符串,也是合法情况,比如使用 DIRECT 或者 FANOUT, bindingKey 是用不上
    return true;
}
// 检查字符串中不存在非法字符
for (int i = 0; i < bindingKey.length(); i++) {
    char ch = bindingKey.charAt(i);
    // 判定该字母是否是大写字母
    if (ch >= 'A' && ch <= 'Z') {
        continue;
    }
    // 判定该字母是否是小写字母
    if (ch >= 'a' && ch <= 'z') {
        continue;
    }
    // 判定该字母是否是阿拉伯数字
    if (ch >= '0' && ch <= '9') {
        continue;
    }
    // 判定是否是 _ 或者 .
    if (ch == '_' || ch == '.' || ch == '#' || ch == '*') {
        continue;
    }
    return false;
}
// 检查 *  # 是否是独立的部分
// 由于 . 在正则表达式中是一种特殊符号,需要转义; 用 \. 但是在 Java中这又是个特殊字符;所以要用 \\.
String[] words = bindingKey.split("\\.");
for (String word : words) {
    // 检查 word 长度 是否大于1,并且包含了 * 或者 # ,就是非法
    if (word.length() > 1 && (word.contains("#") || word.contains("*"))) {
        return false;
    }
}
// 约定下,通配符之间的相邻关系
// 为啥这么约定?因为前三种相邻的时候,实现逻辑非常繁琐,同时功能性提升不大
// 1. aaa.#.#.bbb  -> 非法
// 2. aaa.#.*.bbb  -> 非法
// 3. aaa.*.#.bbb  -> 非法
// 4. aaa.*.*.bbb  -> 合法
for (int i = 0; i < words.length - 1; i++) {
    // 连续两个 #
    if (words[i].equals("#") && words[i].equals("#")) {
        return false;
    }
    // # *
    if (words[i].equals("#") && words[i].equals("*")) {
        return false;
    }
    // * #
    if (words[i].equals("*") && words[i].equals("#")) {
        return false;
    }
}
return true;
}

验证 routingKey 是否合法(checkRoutingKey)

image.png

// routingKey的构造规则
// 1. 数字、字母、下划线
// 2. 使用 . 分割成若干个部分
public boolean checkRoutingKey(String routingKey) {
if (routingKey.length() == 0) {
    // 空字符串,合法情况。比如使用 FANOUT 交换机的时候,routingKey 用不上,就可以设置成 ""
    return true;
}
for (int i = 0; i < routingKey.length(); i++) {
    char ch = routingKey.charAt(i);
    // 判定该字母是否是大写字母
    if (ch >= 'A' && ch <= 'Z') {
        continue;
    }
    // 判定该字母是否是小写字母
    if (ch >= 'a' && ch <= 'z') {
        continue;
    }
    // 判定该字母是否是阿拉伯数字
    if (ch >= '0' && ch <= '9') {
        continue;
    }
    // 判定是否是 _ 或者 .
    if (ch == '_' || ch == '.') {
        continue;
    }
    // 该字符不是上述任何一种,就不合法,直接返回 false
    return false;
}
return true;
}

匹配规则

image.png
image.png

private boolean routeTopic(Binding binding, Message message) {
    // 先把两个 Key 进行拆分
    String[] bindingTokens = binding.getBindingKey().split("\\.");
    String[] routingTokens = message.getRoutingKey().split("\\.");
    // 引入两个下标,指向两个数组,初始情况下都为 0
    int bindingIndex = 0;
    int routingIndex = 0;
    while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {
        // 【情况二】 遇到 *
        if (bindingTokens[bindingIndex].equals("*")) {
            bindingIndex++;
            routingIndex++;
            // 【情况三】 遇到 #
        } else if (bindingTokens[bindingIndex].equals("#")) {
            bindingIndex++;
            // 【情况四】 # 后面没有内容
            if (bindingIndex == bindingTokens.length) {
                // 说明该 # 后面没有东西了 匹配成功
                return true;
            }
            // 后面还有东西,拿着这个内容去 routingTokens 中找,找到对应位置
            // 使用 findNextMatch 这个方法用来查找该部分 在 routingTokens 中的位置,并返回下标; 没找到返回 -1
            routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);
            if (routingIndex == -1) {
                // 没找到匹配结果,返回 false
                return false;
            }
            // 找到匹配结果,继续往下匹配
            bindingIndex++;
            routingIndex++;
        } else {
            // 【情况一】普通字符串需要一模一样
            if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {
                return false;
            }
            bindingIndex++;
            routingIndex++;
        }
    }
    // 判定双方是否同时到达末尾 【情况五】
    if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {
        return true;
    }
    return false;
}

订阅消息(basicComsume)

什么是函数式接口

由于 Java的函数不能脱离类的存在, 为了实现 lambda, Java 引入了函数式接口
lambda的本质(底层实现)

  1. interface
  2. 只能有一个方法
  3. 还需要加 @FunctionalInterface 注解

一个虚拟主机中,有很多队列,每个队列上都有很多条消息。
那么针对是哪个消费者订阅了哪条队列的消息需要进行一个管理。
image.png

推送给消费者消息的基本实现思路
  1. 让 brokerserver把哪些消费者管理好
  2. 收到对应的消息,把消息推送给消费者

消费者是以队列为维度来订阅消息的,一个队列可以有多个消费者(此处我们约定按照轮询的方式来进行消费)。
image.png

实现一个类(完成消费者消费消息核心逻辑)

image.png

package com.example.mq.mqserver.datacenter;

import com.example.mq.common.Consumer;
import com.example.mq.common.ConsumerEnv;
import com.example.mq.common.MqException;
import com.example.mq.mqserver.VirtualHost;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;

import java.util.concurrent.*;

/**
 * 通过这个类,来实现消费消息的核心逻辑
 */
public class ConsumerManager {
    // 持有上层的 VirtualHost 对象的引用,用来操作数据
    private VirtualHost parent;

    // 指定一个线程池,执行具体的回调任务
    private ExecutorService workPool = Executors.newFixedThreadPool(4);
    // 存放一个 令牌(queueName)的队列
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
    // 扫描线程
    private Thread scanThread = null;

    public ConsumerManager(VirtualHost p) {
        parent = p;
        scanThread = new Thread(() -> {
            while (true) {
                try {
                    // 1. 从阻塞队列中 拿到 队列名字
                    String queueName = tokenQueue.take();
                    // 2. 根据队列名字找到队列
                    MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null) {
                        throw new MqException("[ConsumerManager] 取令牌后发现,队列名不存在!queueName=" + queueName);
                    }
                    // 3. 从队列中消费消息
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    e.printStackTrace();
                }
            }
        });
        // 把线程设为后台线程
        scanThread.setDaemon(true);
        scanThread.start();
    }


    //
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

    // 记录当前队列有哪些消费者订阅了
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 找到对应的队列
        MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
        if (queue == null) {
            throw new MqException("[ConsumerManager] 队列不存在!queueName=" + queueName);
        }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer);
        synchronized (queue) {
            queue.addConsumerEnv(consumerEnv);
            // 如果当前队列中已经有消息了,需要立即消费掉
            int messageCount = parent.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < messageCount; i++) {
                // 这个方法调用一次就消费一条消息
                consumeMessage(queue);
            }
        }
    }

    private void consumeMessage(MSGQueue queue) {
        // 1. 先按照轮询的方式,找个消费者出来
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if (luckyDog == null) {
            // 当前队列没有消费者,暂时不消费
            return;
        }
        // 2. 从队列中取出一个消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null) {
            // 当前队列中没有消息,就不消费
            return;
        }
        // 3. 把消息带入到消费者的回调函数中,丢给线程池执行
        workPool.submit(() -> {
            try {
                // 1. 把消息放入到待确认集合中, 这个操作在执行回调之前
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
                // 2. 真正执行回调
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumeTag(), message.getBasicProperties(), message.getBody());
                // 3. 如果当前是自动应答,此时就可以消息删除
                // 手动应答,先不处理,交给后续消费者调用 basicAck来处理
                if (luckyDog.isAutoAck()) {
                    // 1) 删除硬盘
                    if (message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue, message);
                    }
                    // 2) 删除待确认集合
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // 3) 删除内存中消息中心
                    parent.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费!queueName=" + queue.getName());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

订阅消息的核心逻辑,就是调用 consumerManager.addConsumer方法,并传入参数(consumerTag、queueName、autoAck、consumer【回调函数】)。
这个方法的底层是

  1. 根据传入的 queueName查到该队列
  2. 然后创一个身份者表示 ConsumerEnv,存入到该队列的 ConsumerEnvList中
  3. 判断该队列中时候存在消息,已经存在的话,就consumeMessage消费完全部消息(按照轮询方式)

关于消息确认

能够确保消息是被正确的消费掉了,消费者的回调函数,顺利执行完了(中间没有抛出异常)
这条消息就可以被删除了。
消息确认也就是为了保证“消息不丢失”
为了达成消息不丢失这样的效果,这样处理:

  • 在真正执行回调之前,把这个消息先放到 “待确认的集合”中~image.png
  • 真正回调
  • 当前消费者采取的是 autoAck=true,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息
    • 硬盘
    • 内存消息中心哈希表
    • 待确认消息集合
  • 当前消费者采取的是 autoAck=false,手动应答,就需要消费者再回调函数内部,显式调用 basicAck这个核心API

basicAck实现原理,比较简单,当传入参数 autoAck=false, 就手动再回调函数的时候,调用 basicAck 就行

  1. 传入queueName和messageId
  2. 获取到队列和消息
  3. 删除硬盘中数据
  4. 删除内存中心的消息数据
  5. 删除待确认集合中的消息数据
// 确认消息
public boolean basicAck(String queueName, String messageId) {
    try {
        // 1. 获取到消息和队列
        queueName = virtualName +queueName;
        Message message = memoryDataCenter.getMessage(messageId);
        if (message == null) {
            throw new MqException("[VirtualHost] 要确认的消息不存在!messageId=" + messageId);
        }
        MSGQueue queue = memoryDataCenter.getQueue(queueName);
        if (queue == null) {
            throw new MqException("[VirtualHost] 要确认的队列不存在!messageId=" + messageId);
        }
        // 2. 删除硬盘上数据
        if (message.getDeliverMode() == 2) {
            diskDataCenter.deleteMessage(queue, message);
        }
        // 3. 删除内存中心数据
        memoryDataCenter.removeMessage(messageId);
        // 4. 删除待确认集合中的数据
        memoryDataCenter.removeMessageWaitAck(queueName, messageId);
        System.out.println("[VirtualHost] basicAck成功!消息被成功确认!queueName" + queueName +
                           ", messageId=" + messageId);
        return true;
    } catch (Exception e) {
        System.out.println("[VirtualHost] basicAck失败!消息确认失败!queueName=" + queueName +
                           ", messageId=" + messageId);
        e.printStackTrace();
        return false;
    }
}

消息确认是为了保证消息的不丢失,而需要的逻辑

    1. 执行回调方法的过程中,抛异常了~
    • 当回调函数异常,后续逻辑执行不到了。此时这个消费就会始终待在待确认集合中。RabbitMQ中会设置一个死信队列,每一个队列都会绑定一个死信队列。应用场景:当消息在消费过程中出现异常,就会把消息投入到死信队列中;当消息设置了过期时间,如果在过期时间内,没有被消费,就会投入到死信队列中;当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。
    1. 执行回调过程中, Broker Server崩溃了~内存数据都没了!但是硬盘数据还在,正在消费的这个消息,在硬盘中仍然存在。BrokerServer重启后,这个消息就又被加载到内存了,就像从来没被消费过一样。消费者就会有机会重新得到这个消息。

网络通信设计

基于TCP,自定义应用层协议

image.png

  • type:描述当前这个请求和响应,是干啥的。用四个字节来存储
    • 在MQ中,客户端(生产者 + 消费者)和 服务器 (Broker Server)之间,要进行哪些操作?(就是VirtualHost中的那些核心API)
    • 希望客户端,能通过网络远程调用这些API
    • 此处的type就是描述当前这个请求/响应是在调用哪个APIimage.png
    • TCP是有连接的. Channel 是 Connection 内部的逻辑连接。此时一个 Connection 中可能有多个连接,
    • 存在的意义是让 TCP 连接得到复用(创建/断开TCP连接成本挺高【需要三次握手,四次挥手~】)
  • length:里面存储的是 payload的长度。用4个字节来存储
  • payload:会根据当前是请求还是响应,以及当前的 type 有不同的值
    • 比如 type 是 0x3(创建交换机),同时当前是个请求,此时 payload 的内容,就相当于是 exchangeDelcare 的参数的序列化结果
    • 比如 type 是 0x3(创建交换机),同时当前是个响应,此时 payload 的内容,就相当于是 exchangeDelcare 的返回结果的序列化内容
ExchangeDelcare

请求 Request

image.png

响应 Response

image.png
image.png

ExchangeDelete

请求 Request

image.png

响应 Response

image.png

QueueDelcare

请求 Request

image.png

响应 Response

image.png

QueueDelete

请求 Request

image.png

响应 Response

image.png

QueueBind

请求 Request

image.png

响应 Response

image.png

QueueUnBind

请求 Request

image.png

响应 Response

image.png

BasicPublish

请求 Request

image.png

响应 Response

image.png

BasicConsume

请求 Request

image.png

响应 Response

image.png

BasicAck

请求 Request

image.png

响应 Response

image.png

创建BrokerServer类

消息队列本体服务器(本质上就是一个 TCP 的服务器)

image.png

实现读取请求和写回响应

  • 读取请求
private Request readRequest(DataInputStream dataInputStream) throws IOException {
    Request request = new Request();
    request.setType(dataInputStream.readInt());
    request.setLength(dataInputStream.readInt());
    byte[] payload = new byte[request.getLength()];
    int n = dataInputStream.read(payload);
    if (n != request.getLength()) {
        throw new IOException("读取格式出错!");
    }
    request.setPayload(payload);
    return request;
}
  • 写回响应
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
    dataOutputStream.writeInt(response.getType());
    dataOutputStream.writeInt(response.getLength());
    dataOutputStream.write(response.getPayload());
    dataOutputStream.flush();
}

清理过期和会话

image.png

private void clearClosedSession(Socket clientSocket) {
//  这里要做的,主要遍历 上述 sessions 哈希表,把该关闭的 socket 对应的键值对,全部删掉
List<String> toDeleteChannelId = new ArrayList<>();
for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
    if (entry.getValue() == clientSocket) {
        // 不能直接删除
        // 这属于集合类的大忌,一边遍历,一边删除
        toDeleteChannelId.add(entry.getKey());
    }
}
for (String channelId : toDeleteChannelId) {
    sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成!被清理的 channelId=" + toDeleteChannelId);
}

客服端代码(mqclient)

ConnectionFactory 连接工厂

这个类持有服务器的地址
主要的功能就是:创建出连接 Connection 对象

@Data
public class ConnectionFactory {
    // brokerserver 的ip地址
    private String host;
    // brokerserver 的port
    private int port;
    public Connection newConnection() throws IOException {
        Connection connection = new Connection(host,port);
        return connection;
    }
}

Connection 表示一个TCP连接
  • 持有 Socket 对象
  • 发送请求
  • 读取响应(创建一个扫描线程,由这个线程负责不停地从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 进行处理)
    • 如果 response.type == 0xc,则是服务器推送的消息
    • 利用 SubScribeReturns 来接收
    • 根据 channelId 找到相应的 channel对象
    • 利用线程池执行 channel 里面的回调函数
    • 如果是 response.type != 0xc,则当前响应是针对控制请求的响应
    • 利用 BasicReturns 来接收
    • 根据 BasicReturns 对象中的 channelId 在 channelMap中找到 channel对象
    • 并将 BasicReturns 存到 channel对象中的 basicReturnsMap 哈希表中
  • 创建一个 channel
    • 随机生成 C+UUID
    • 将当前对象存放到 Connection 管理 channel 的哈希表中
    • 然后将 这个命令 通过 connection 发送给 服务器
  • 管理多个 channel 对象
    • ConcurrentHashMap<String,Channel> channelMap
    • 每次创建一个 channel的时候,就存进去
channel 表示一个逻辑上的连接

一个客户端可以有多个模块。
每个模块都可以和 brokerserver之间建立”逻辑上的连接“ (channel)
这几个模块的 channel 彼此之间是相互不影响的
但是这几个 channel 复用了同一个 TCP 连接

还需要提供一系列的方法,去和服务器提供的核心API对应
(客户端提供的方法,方法的内部,就是发了一个特定的请求)

对于一个客户端的一次 Connection下,可能会有多个 channel,就是多个逻辑上的连接,那么如何区分响应?
例如有 channelA 和 channelB 。channelA发送的请求A,channelB发送的请求B。此时响应的顺序不会按照顺序返回,而且channelA也不用关系其他响应,只关心是否收到响应A。
所以此时需要在 channel 下用一个 basicReturns来存储当前 channle 的收到服务器的响应。当客户端connetion读取到响应时候,添加到 channel中 basicReturns

image.png

image.png

项目总结

写了一个消费者队列服务器。
核心功能就是提供了虚拟机、交换机、队列、消息等概念的管理;实现了三种典型的消息转发方式。
基于上述内容就可以实现 跨主机/服务器 之间的 生产者消费者模型了。

项目扩展:

  1. 虚拟主机的管理(建立虚拟主机表)
  2. 用户管理/用户认证(建立用户表,可在建立连接的时候或者建立channel)
  3. 交换机/队列,独占/自动删除/扩展参数
  4. 发送方确认(服务器返回响应,生产者收到后触发回调)
  5. 拒绝应答
  6. 死信队列(针对消息可靠性)
  7. 管理接口 & 管理页面

源码地址:MQ源码地址 可以配合文档一起看,更能快速了解

本文含有隐藏内容,请 开通VIP 后查看