前言:本项目是仿照RabbitMQ并基于SpringBoot + Mybatis + SQLite3实现的消息队列,该项目实现了MQ的核心功能:生产者、消费者、中间人、发布、订阅等。
源码链接:仿Rabbit MQ实现消息队列
目录
前言:本项目是仿照RabbitMQ并基于SpringBoot + Mybatis + SQLite3实现的消息队列,该项目实现了MQ的核心功能:生产者、消费者、中间人、发布、订阅等。
12.3 实现 readRequest / writeResponse
一、核心概念
关于消息队列,有几个重要的核心概念:
- 生产者(Producer) :负责将应用程序产生的数据转换成消息,并将这些消息推送到消息队列服务器上,以便消费者(Consumer)可以接收并处理这些消息。
- 消费者(Consumer):它的主要职责是监听特定的队列或主题,并对到达的消息执行必要的业务逻辑。
- 中间人(Broker):作为生产者(Producer)和消费者(Consumer)之间的中介,负责管理和协调消息的传递过程。
- 发布(Publish):生产者向中间人投递消息的过程。
- 订阅(Subscribe):哪些消费者要从这个中间人获取数据,这个注册的过程称为订阅。
在中间人(Broker)模块,又有以下几个概念:
- 虚拟机 (VirtualHost):类似于 MySQL 的 "database", 是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
- 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue。
- 队列 (Queue): 真正⽤来存储消息的部分,每个消费者决定⾃⼰从哪个 Queue 上读取消息。
- 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多" 关系,使⽤⼀个关联表就可以把这两个概念联系起来。
二、模块划分
明确需要做的工作:
实现生产者、消费者、Broker Server这三个部分。
针对生产者、消费者,主要实现的是客户端和服务器的网络通信部分。
给客户端提供一组 API,让客户端的业务代码来调用,通过网络通信的方式远程调用Broker Server上的方法。
实现Broker Server 内部的一些基本概念和API(虚拟主机、交换机、队列、绑定、消息)。
持久化(考虑到 SQLite 相比 MySQL 来说比较轻量,因此存储交换机、队列等这些实体用 SQLite,消息的存储使用文件进行管理)。
针对于上述所需要实现的模块,进行划分:
三、创建核心实体类
3.1 创建交换机(Exchange)
name | type | durable | autoDelete | arguments |
---|---|---|---|---|
交换机身份标识 | 交换机类型 | 是否持久化 | 是否自动删除 | 额外参数选项 |
@Data
public class Exchange {
//交换机的身份标识(唯一)
private String name;
//交换机类型 direct fanout topic
private ExchangeType type = ExchangeType.DIRECT;
//表示该交换机是否要持久化存储. true 表示需要持久化. false 表示不需要持久化
private boolean durable = false;
//如果当前交换机,无客户端使用,就自动删除
private boolean autoDelete = false;
//表示创建交换机时指定的一些额外的参数选项
private Map<String,Object> arguments = new HashMap<>();
}
此处省略 arguments 存储数据库时的Json转换,只需要使用 ObjectMapper即可实现。关于交换机的类型,此次主要实现了以下三种:DIRECT、FANOUT、TOPIC。并使用枚举类定义:
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type){
this.type = type;
}
public int getType() {
return type;
}
}
3.2 创建队列实体类(MSGQueue)
name | durable | exclusive | autoDelete | arguments | consumerEnvList | consumerSeq |
---|---|---|---|---|---|---|
队列标识 | 是否持久化 | 是否独占 | 是否自动删除 | 额外参数选项 | 当前订阅的消费者列表 | 记录当前取到第几个消费者 |
@Data
public class MSGQueue {
//表示队列的身份标识
private String name;
//表示队列是否持久化 true:需要持久化 false:不需要持久化
private boolean durable = false;
//表示是否独占,true:独占 false:都可以使用
private boolean exclusive = false;
//表示无客户端使用是,是否自动删除
private boolean autoDelete = false;
//表示扩展参数
private Map<String,Object> arguments = new HashMap<>();
//当前队列都有哪些消费者订阅了.
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//记录当前取到的第几个消费者,方便实现轮询策略
private AtomicInteger consumerSeq = new AtomicInteger(0);
}
3.3 创建绑定实体类(Binding)
exchangeName | queueName | bindingKey |
---|---|---|
交换机名字 | 队列名字 | 绑定(和 routingKey匹配) |
@Data
public class Binding {
// 交换机名字
private String exchangeName;
//队列名字
private String queueName;
// 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配
private String bindingKey;
}
3.4 创建消息实体类(Message)
消息存储为二进制形式,因此对消息的存储需要进行序列化处理,所以 Message 类要实现Serializable 接口。
basicProperties | body | offsetBeg | offsetEnd | isValid |
---|---|---|---|---|
消息的属性 | 消息的正文 | 消息开头在文件中的偏移量 | 消息末尾在文件中的偏移量 | 是否有效 |
@Data
public class Message implements Serializable {
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
//辅助属性,后续消息要存储在文件中
//一个文件存储很多消息 [offsetBeg, offsetEnd)
private transient long offsetBeg = 0;//消息数据的开头距离文件开头的位置偏移(字节)
private transient long offsetEnd = 0;//消息数据的结尾距离文件开头的位置偏移(字节)
private byte isValid = 0x1;//表示该消息在文件中是否是有效的消息,逻辑删除, 0x1:有效 0x0:无效
//创建一个工厂方法,让工厂方法去封装一下创建 Message 对象的过程
//这个方法创建的 Message 会自动生成一个唯一的 MessageId
public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){
Message message = new Message();
if(basicProperties != null){
message.setBasicProperties(basicProperties);
}
//此处生成的 MessageId 以 "M-" 为前缀, 方便区分
message.setMessageId("M-" + UUID.randomUUID());
message.setRoutingKey(routingKey);
message.body = body;
return message;
}
public String getMessageId(){
return basicProperties.getMessageId();
}
public void setMessageId(String messageId){
basicProperties.setMessageId(messageId);
}
public String getRoutingKey(){
return basicProperties.getRoutingKey();
}
public void setRoutingKey(String routingKey){
basicProperties.setRoutingKey(routingKey);
}
public int getDeliverMode(){
return basicProperties.getDeliverMode();
}
public void setDeliverMode(int deliverMode){
basicProperties.setDeliverMode(deliverMode);
}
}
对于消息的属性,使用一个实体类去表示:
messageId | routingKey | deliverMode |
---|---|---|
消息的唯一身份标识 | 和(bindingKey匹配) | 是否持久化 |
@Data
public class BasicProperties implements Serializable {
//消息的唯一身份标识,使用 UUID 作为 messageId
private String messageId;
/**
* 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名
* 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用)
* 如果当前的交换机类型是 TOPIC, 此时 routingKey 就表示和 bindingKey 进行匹配
*/
private String routingKey;
//表示消息是否持久化, 1: 不持久化; 2: 持久化
private int deliverMode = 1;
}
四、数据库操作
SQLite 只是把数据单纯的存储到⼀个⽂件中,因此在这里设定存储到 “./data/meta.db”文件。
实现创建表以及数据库操作(这里不再展示具体的SQL语句)
@Mapper
public interface MetaMapper {
//三个核心建表方法
void createExchangeTable();
void createQueueTable();
void createBindingTable();
//针对上述三个基本概念进行插入和删除
void insertExchange(Exchange exchange);
List<Exchange> selectAllExchanges();
void deleteExchange(@Param("exchangeName") String exchangeName);
void insertQueue(MSGQueue queue);
List<MSGQueue> selectAllQueues();
void deleteQueue(@Param("queueName") String queueName);
void insertBinding(Binding binding);
List<Binding> selectAllBindings();
void deleteBinding(Binding binding);
}
五、封装对数据库的操作
public class DataBaseManager {
//数据库初始化
public void init(){...}
//删除数据库
public void deleteDB(){...}
//判断数据库是否存在
private boolean checkDBExists(){...}
//建表操作
private void createTable(){...}
//创建默认数据,RabbitMQ 里默认也带有一个 匿名 的交换机,类型是 DIRECT
private void createDefaultData(){...}
//交换机的数据库操作:增删查
public void insertExchange(Exchange exchange){...}
public void deleteExchange(String exchangeName){...}
public List<Exchange> selectAllExchanges(){...}
//队列的数据库操作:增删查
public void insertQueue(MSGQueue queue){...}
public void deleteQueue(String queueName){...}
public List<MSGQueue> selectAllQueues(){...}
//Binding的数据库操作:增删查
public void insertBinding(Binding binding){...}
public void deleteBinding(Binding binding){...}
public List<Binding> selectAllBindings(){...}
}
六、消息的存储设计
6.1 设计思路及设定
同时,因为队列用来存储消息,因此这里约定:
- 给每个队列分配⼀个目录,目录的名字为 data + 队列名,形如 :./data/queueName
- 该目录中包含两个固定名字的⽂件:
queue_data.txt 消息数据⽂件, 用来保存消息内容。
queue_stat.txt 消息统计⽂件, 用来保存消息统计信息。(消息总个数/t有效消息数),这样设计主要时考虑到后续进行垃圾回收,方便判断进行GC的时机。
6.2 设定存储消息的格式
消息数据文件以二进制的形式存储在 queue_data.txt 文件中 ,为了方便进行消息的读取,这里进行这样的设定:
6.3 实现消息序列化工具
对于实现消息序列化,首先 Message 实体类要实现 Serializable 接口,接下来需要借助ByteArrayOutputStream 和 ObjectOutputStream 实现消息的序列化和反序列化:
public class BinaryTool {
/**
* 把对象序列化成一个字节数组
* @param object
* @return
*/
public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个变长的字节数组,
// 可以把 object 对象序列化的数据逐步写入导 byteArrayOutputStream 中,
// 再统一转成 byte[]
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){
// 此处的 writeObject 就会把 object 进行序列化,生成的字节数据就会写入到 objectOutputStream
// objectOutputStream 又关联到了 byteArrayOutputStream,最终结果写入到 byteArrayOutputStream 里
objectOutputStream.writeObject(object);
}
return byteArrayOutputStream.toByteArray();
}
}
/**
* 把一个字节数组反序列化成一个对象
* @param data
* @return
*/
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){
try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
object = objectInputStream.readObject();
}
}
return object;
}
}
七、实现文件管理消息
创建 MessageFileManager 类,这个类主要去实现消息统计文件的读写、消息数据文件的读写、创建存储消息的文件、消息的垃圾回收等等。
下面这段代码是 MessageFileManager 的基础代码,实现文件的读写和垃圾回收都需要调用下面的方法:
public class MessageFileManager {
//定义一个内部类,来表示该队列的统计信息
static public class Stat{
public int totalCount; // 总消息数量
public int validCount; // 有效消息数量
}
public void init(){
}
//约定消息文件所在的目录和文件名
// 所在路径及文件名: ./data/队列名/queue_data.txt(消息数据文件)
// ./data/队列名/queue_stat.txt(消息统计文件)
/**
* 这个方法用来获取指定队列对应的消息文件所在路径
* @param queueName
* @return
*/
private String getQueueDir(String queueName){
return "./data/" + queueName;
}
/**
* 这个方法用来获取该队列的消息数据文件路径
* @param queueName
* @return
*/
private String getQueueDataPath(String queueName){
return getQueueDir(queueName) + "/queue_data.txt";
}
/**
* 这个方法用来获取指定队列的消息统计文件的路径
* @param queueName
* @return
*/
private String getQueueStatPath(String queueName){
return getQueueDir(queueName) + "/queue_stat.txt";
}
7.1 读取消息统计文件
private Stat readStat(String queueName){
Stat stat = new Stat();
try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
Scanner scanner = new Scanner(inputStream);
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
7.2 写消息统计文件
private void writeStat(String queueName, Stat stat){
// 使用 PrintWriter 写文件
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
7.3 创建队列对应的文件和目录
public void createQueueFiles(String queueName) throws IOException {
//1.创建队列对应的消息目录
File baseDir = new File(getQueueDir(queueName));
if(!baseDir.exists()){
boolean success = baseDir.mkdirs();
if(!success){
throw new IOException("创建目录失败! baseDir :" + baseDir.getAbsolutePath());
}
}
//2.创建消息数据文件
File queueDataFile = new File(getQueueDataPath(queueName));
if(!queueDataFile.exists()){
boolean success = queueDataFile.createNewFile();
if(!success){
throw new IOException("创建消息数据文件失败! queueDataFile: " + queueDataFile.getAbsolutePath());
}
}
//3.创建消息统计文件
File queueStatFile = new File(getQueueStatPath(queueName));
if(!queueStatFile.exists()){
boolean success = queueStatFile.createNewFile();
if(!success){
throw new IOException("创建消息统计文件失败! queueStatFile: " + queueStatFile.getAbsolutePath());
}
}
//4.给消息统计文件设置初始值
Stat stat = new Stat();
stat.totalCount = 0;
stat.validCount = 0;
writeStat(queueName,stat);
}
7.4 删除队列对应的文件和目录
public void destroyQueueFiles(String queueName) throws IOException {
//先删除文件,再删除目录
File queueDataFile = new File(getQueueDataPath(queueName));
boolean success1 = queueDataFile.delete();
File queueStatFile = new File(getQueueStatPath(queueName));
boolean success2 = queueStatFile.delete();
File baseDir = new File(getQueueDir(queueName));
boolean success3 = baseDir.delete();
if(!success1 || !success2 || !success3){
//删除失败
throw new IOException("删除队列目录和消息文件失败! baseDir: " + baseDir.getAbsolutePath());
}
}
7.5 判断队列的目录和消息文件是否存在
public boolean checkFileExists(String queueName){
//判断队列的 消息数据文件 和 消息统计文件 是否都存在
File queueDataFile = new File(getQueueDataPath(queueName));
File queueStatFile = new File(getQueueStatPath(queueName));
if(queueDataFile.exists() && queueStatFile.exists()){
return true;
}
return false;
}
7.6 新的消息写入到文件中
步骤:
- 检查要写入的队列对应的文件是否存在
- 对 Message 对象进行序列化
- 获取当前消息数据文件的长度,由此来设置当前要写入的消息的 offsetBeg 和 offsetEnd。
offsetBeg = 消息数据文件长度 + 4
offsetEnd = 消息数据文件长度 + 4 + 该消息序列化后的 byte 数组的长度
- 写入消息数据文件,更新消息统计文件
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 检查当前要写入的队列对应的文件是否存在
if(!checkFileExists(queue.getName())){
throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName: " + queue.getName());
}
// 把 Message 对象进行序列化,转成二进制的字节数组
byte[] messageBinary = BinaryTool.toBytes(message);
// 避免出现线程安全问题,即多个消息同时都往一个消息队列里面写消息
synchronized (queue){
// 获取到队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据写入到数据文件的末尾, 此时 Message 对象的 offsetBeg, 就是当前文件长度 + 4
// offsetEnd 就是当前文件长度 + 4 + message长度
File queueDateFile = new File(getQueueDataPath(queue.getName()));
message.setOffsetBeg(queueDateFile.length() + 4);
message.setOffsetEnd(queueDateFile.length() + 4 + messageBinary.length);
// 写入消息数据文件, 此处是追加写
try (OutputStream outputStream = new FileOutputStream(queueDateFile, true)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
// 先写消息长度,占据 4 个字节
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(),stat);
}
}
7.7 删除队列对应的消息数据文件中的消息
这里的删除采用逻辑删除,即把 Message 对象从文件中读取出来之后,把 valid 属性设置成 0 ,再重新写入并更新消息统计文件。
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
synchronized (queue){
try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()),"rw")){
// 读取对应的 Message 数据.
byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.read(bufferSrc);
// 转换为 Message 对象
Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);
diskMessage.setIsValid((byte) 0x0);
// 重新写入文件
byte[] bufferDest = BinaryTool.toBytes(diskMessage);
randomAccessFile.seek(message.getOffsetBeg());
randomAccessFile.write(bufferDest);
}
// 更新统计文件
Stat stat = readStat(queue.getName());
if(stat.validCount > 0){
stat.validCount -= 1;
}
writeStat(queue.getName(),stat);
}
}
7.8 从文件中读取消息到内存中
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
LinkedList<Message> messages = new LinkedList<>();
long currentOffset = 0; // 使用这个变量记录光标的位置
try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
try (DataInputStream dataInputStream = new DataInputStream(inputStream)){
while (true){
//读取一条消息长度
int messageSize = dataInputStream.readInt();
// 按照长度读取消息内容
byte[] buffer = new byte[messageSize];
int actualSize = dataInputStream.read(buffer);
if(messageSize != actualSize){
//不匹配说明文件有问题
throw new MqException("[MessageFileManager] 文件格式错误! queueName: " + queueName);
}
//反序列化
Message message = (Message) BinaryTool.fromBytes(buffer);
//判断是否是无效数据
if (message.getIsValid() != 0x1){
currentOffset += (4 + messageSize);
continue;
}
//有效数据,加入到链表中
message.setOffsetBeg(currentOffset + 4);
message.setOffsetEnd(currentOffset + 4 + messageSize);
currentOffset += (4 + messageSize);
messages.add(message);
}
}catch (EOFException e){
System.out.println("[MessageFileManager] 恢复 Message 数据完成!");
}
return messages;
}
}
7.9 判断垃圾回收时机
这里的数字是拍脑门写的,当消息总数大于 2000 并且 消息的有效个数小于 50% 时,进行垃圾回收。
public boolean checkGC(String queueName){
// 读取消息统计文件的数据
Stat stat = readStat(queueName);
if(stat.totalCount > 2000 && (double)stat.validCount / stat.totalCount < 0.5){
return true;
}
return false;
}
7.10 获取新消息数据文件的路径
public String getQueueDataNewPath(String queueName){
return getQueueDir(queueName) + "/queue_data_new.txt";
}
7.11 消息的垃圾回收
这里我采用了复制算法进行垃圾回收,具体实现步骤:
- 创建一个新的文件,命名为 queue_data_new.txt
- 把之前消息数据文件的有效消息读取并写到新的文件中
- 删除旧的消息数据文件,进行文件重命名,同时记录消息统计文件
public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
synchronized (queue){
long gcBeg = System.currentTimeMillis();
//创建一个新的文件
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if(queueDataNewFile.exists()){
throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new.txt 已经存在! queueName: " + queue.getName());
}
boolean success = queueDataNewFile.createNewFile();
if(!success){
throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile: " + queueDataNewFile.getAbsolutePath());
}
// 从旧的消息文件中读取所有的有效数据文件
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 把有效的消息全部写入到新的文件中
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
for (Message message : messages){
byte[] buffer = BinaryTool.toBytes(message);
//先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
// 再写消息的内容
dataOutputStream.write(buffer);
}
}
}
// 删除旧的数据文件,进行文件重命名
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
success = queueDataOldFile.delete();
if(!success){
throw new MqException("[MessageFileManager] 旧的数据文件删除失败! queueDataOldFile: " + queueDataOldFile.getAbsolutePath());
}
success = queueDataNewFile.renameTo(queueDataOldFile);
if(!success){
throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile: " + queueDataNewFile.getAbsolutePath() +
", queueDataOldFile: " + queueDataOldFile.getAbsolutePath());
}
// 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(),stat);
long gcEnd = System.currentTimeMillis();
System.out.println("[MessageFileManager] gc 执行完毕! queueName: " + queue.getName() +
", time: " + (gcEnd - gcBeg) + "ms");
}
}
八、统一硬盘处理
消息存储在文件中,交换机、绑定、队列存储在数据库中,对此进行统一处理。也就是说使用一个类管理所有硬盘上的数据。
public class DiskDataCenter {
//这个实例用来管理数据库的数据
private DataBaseManager dataBaseManager = new DataBaseManager();
//这个实例用来管理文件中的数据
private MessageFileManager messageFileManager = new MessageFileManager();
public void init(){
dataBaseManager.init();
messageFileManager.init();
}
/**
* 封装交换机、绑定、队列操作
* @param exchange
*/
public void insertExchange(Exchange exchange){
dataBaseManager.insertExchange(exchange);
}
public void deleteExchange(String exchangeName){
dataBaseManager.deleteExchange(exchangeName);
}
public List<Exchange> selectAllExchanges(){
return dataBaseManager.selectAllExchanges();
}
//封装队列操作
public void insertQueue(MSGQueue queue) throws IOException {
messageFileManager.createQueueFiles(queue.getName());
dataBaseManager.insertQueue(queue);
}
public void deleteQueue(String queueName) throws IOException {
messageFileManager.destroyQueueFiles(queueName);
dataBaseManager.deleteQueue(queueName);
}
public List<MSGQueue> selectAllQueues(){
return dataBaseManager.selectAllQueues();
}
//封装绑定操作
public void insertBinding(Binding binding){
dataBaseManager.insertBinding(binding);
}
public void deleteBinding(Binding binding){
dataBaseManager.deleteBinding(binding);
}
public List<Binding> selectAllBindings(){
return dataBaseManager.selectAllBindings();
}
/**
* 封装消息操作
*/
public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
messageFileManager.sendMessage(queue,message);
}
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {
messageFileManager.deleteMessage(queue,message);
if(messageFileManager.checkGC(queue.getName())){
messageFileManager.gc(queue);
}
}
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
return messageFileManager.loadAllMessageFromQueue(queueName);
}
}
九、内存管理
这里主要使用了线程安全的哈希表保存内存中的消息、交换机、绑定、队列等。
public class MemoryDataCenter {
// key 是 exchangeName value 是 Exchange 对象
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
// key 是 queueName, value 是 MSGQueue 对象
private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
// 第一个 key 是 exchangeName, 第二个 key 是 queueName
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
// key 是 messageId, value 是 Message 对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
// key 是 queueName, value 是 Message 的链表
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
// 第一个 key 是 queueName 第二个 key 是 messageId
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
}
9.1 交换机相关操作的API
- 新增交换机
public void insertExchange(Exchange exchange){
exchangeMap.put(exchange.getName(), exchange);
System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName: " + exchange.getName());
}
- 查询交换机
public Exchange getExchange(String exchangeName){
return exchangeMap.get(exchangeName);
}
- 删除交换机
public void deleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 删除交换机成功! exchangeName: " + exchangeName);
}
9.2 队列相关操作的API
- 新增队列
public void insertQueue(MSGQueue queue){
queueMap.put(queue.getName(), queue);
System.out.println("[MemoryDataCenter] 新队列添加成功! queueName: " + queue.getName());
}
- 查询队列
public MSGQueue getQueue(String queueName){
return queueMap.get(queueName);
}
- 删除队列
public void deleteQueue(String queueName){
queueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 删除队列成功成功! queueName: " + queueName);
}
9.3 绑定相关操作的API
- 新增绑定
public void insertBinding(Binding binding) throws MqException {
//先使用 exchangeName 查一下对应的哈希表是否存在
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());
synchronized (bindingMap){
//再根据 queueName 查一下,如果已经存在,就抛出异常,不存在才能插入
if(bindingMap.get(binding.getQueueName()) != null){
throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName: " +
binding.getExchangeName() + ", queueName: " + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(),binding);
}
System.out.println("[MemoryDataCenter] 新绑定添加成功! queueName: " + binding.getQueueName() + ", exchangeName: "
+ binding.getExchangeName());
}
- 根据交换机名和队列名查询绑定
public Binding getBinding(String exchangeName,String queueName){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);
if(bindingMap == null){
return null;
}
return bindingMap.get(queueName);
}
- 查询交换机绑定的所有队列
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){
return bindingsMap.get(exchangeName);
}
- 删除绑定
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if(bindingMap == null){
//无法删除
throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName: " + binding.getExchangeName() +
", queueName: " + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功! queueName: " + binding.getQueueName() + ", exchangeName: "
+ binding.getExchangeName());
}
9.4 消息相关操作的API
- 添加消息
public void addMessage(Message message){
messageMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 新消息添加成功! messageId: " + message.getMessageId());
}
- 查询消息
public Message getMessage(String messageId){
return messageMap.get(messageId);
}
- 删除消息
public void removeMessage(String messageId){
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息成功移除! messageId: " + messageId);
}
- 发送消息到指定队列
public void sendMessage(MSGQueue queue, Message message){
//把消息放到指定的数据结构中
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
synchronized (messages){
messages.add(message);
}
addMessage(message);
System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId: " + message.getMessageId());
}
- 从队列中获取消息
public Message pollMessage(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null){
return null;
}
synchronized (messages){
if(messages.size() == 0){
return null;
}
//取头元素
Message curMessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId: " + curMessage.getMessageId());
return curMessage;
}
}
- 获取指定队列中的消息个数
public int getMessageCount(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
if(messages == null){
return 0;
}
synchronized (messages){
return messages.size();
}
}
- 添加未确认的消息
public void addMessageWaitAck(String queueName,Message message){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());
messageHashMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId: " + message.getMessageId());
}
- 删除已经确认的消息
public void removeMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if(messageHashMap == null){
return;
}
messageHashMap.remove(messageId);
System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId: " + messageId);
}
- 获取未确认的消息
public Message getMessageWaitAck(String queueName, String messageId){
ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
if(messageHashMap == null){
return null;
}
return messageHashMap.get(messageId);
}
- 从硬盘读取数据恢复到内存中
public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
// 1. 恢复所有的交换机数据
exchangeMap.clear();
queueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageMap.clear();
List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
for (Exchange exchange : exchanges){
exchangeMap.put(exchange.getName(), exchange);
}
// 2. 恢复所有的队列数据
List<MSGQueue> queues = diskDataCenter.selectAllQueues();
for(MSGQueue queue : queues){
queueMap.put(queue.getName(), queue);
}
// 3. 恢复所有的绑定数据
List<Binding> bindings = diskDataCenter.selectAllBindings();
for(Binding binding : bindings){
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(),binding);
}
// 4. 恢复所有的消息数据
// 遍历所有的队列 根据每个队列的名字获取到所有的消息
for(MSGQueue queue : queues){
LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
queueMessageMap.put(queue.getName(), messages);
for(Message message : messages){
messageMap.put(message.getMessageId(), message);
}
}
// 针对未确认的消息不需要从硬盘上恢复,一旦在等待 ack 的过程中服务器重启,此时被恢复成未被取走的消息
}
十、虚拟机 VirtualHost
每个虚拟机下都管理着自己的交换机、队列、绑定、消息这些数据,同时供上层API进行调用,本项目目前只支持单个交换机。
public class VirtualHost {
private String virtualHostName;
private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
private DiskDataCenter diskDataCenter = new DiskDataCenter();
private Router router = new Router();
private ConsumerManager consumerManager = new ConsumerManager(this);
private final Object exchangeLocker = new Object();// 操作交换机的锁对象
private final Object queueLocker = new Object(); // 操作队列的锁对象
public VirtualHost(String name){
this.virtualHostName = name;
//对于 MemoryDataCenter 来说, 不需要初始化
//针对 DiskDataCenter 来说,需要进行初始化操作,建库建表和初始数据的设定
//此外,针对硬盘的数据,恢复到内存中
diskDataCenter.init();
try {
memoryDataCenter.recovery(diskDataCenter);
} catch (IOException | MqException | ClassNotFoundException e) {
e.printStackTrace();
System.out.println("[VirtualHost] 恢复内存数据失败!");
}
}
public String getVirtualHostName() {
return virtualHostName;
}
public MemoryDataCenter getMemoryDataCenter() {
return memoryDataCenter;
}
public DiskDataCenter getDiskDataCenter() {
return diskDataCenter;
}
其中,Router 类规定了交换机转发的规则:
public class Router {
/**
* bindingKey 的构造规则
* 1.数字、字母、下划线
* 2.使用 . 分割成若干部分
* 3.允许存在 * 和 # 作为通配符,但是通配符只能作为一个独立的分段
* @param bindingKey
* @return
*/
public boolean checkBindingKey(String bindingKey){}
/**
* routingKey 的构造规则:
* 1.数字、字母、下划线
* 2.使用 . 分割成若干部分
*/
public boolean checkRoutingKey(String routingKey){}
/**
* 判定该消息是否可以转发给这个绑定对应的队列
* @param exchangeType
* @param binding
* @param message
* @return
*/
public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException{}
/**
* 校验 bindingKey 和 routingKey 是否匹配
* @param binding
* @param message
* @return
*/
private boolean routeTopic(Binding binding,Message message){}
}
10.1 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
Map<String,Object> arguments){
//把交换机的名字加上虚拟主机的名字
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
//判定该交换机是否已经存在,直接通过内存查询
Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);
if(existsExchange != null){
//该交换机已经存在
System.out.println("[VirtualHost] 交换机已经存在! exchangeName: " + exchangeName);
return true;
}
Exchange exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arguments);
if(durable){
diskDataCenter.insertExchange(exchange);
}
memoryDataCenter.insertExchange(exchange);
System.out.println("[VirtualHost] 交换机创建完成! exchangeName: " + exchangeName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 交换机创建失败! exchangeName: " + exchangeName);
e.printStackTrace();
return false;
}
}
10.2 删除交换机
public boolean exchangeDelete(String exchangeName){
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
Exchange toDelete = memoryDataCenter.getExchange(exchangeName);
if(toDelete == null){
throw new MqException("[VirtualHost] 交换机不存在,无法删除!");
}
if(toDelete.isDurable()){
diskDataCenter.deleteExchange(exchangeName);
}
memoryDataCenter.deleteExchange(exchangeName);
System.out.println("[VirtualHost] 交换机删除成功! exchangeName: " + exchangeName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 交换机删除失败! exchangeName: " + exchangeName);
e.printStackTrace();
return false;
}
}
10.3 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
Map<String,Object> arguments){
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker){
//1.判断队列是否存在
MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
if(existsQueue != null){
System.out.println("[VirtualHost] 队列已经存在! queueName: " + queueName);
return false;
}
//2.构造队列对象
MSGQueue queue = new MSGQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);
//3.插入硬盘
if(queue.isDurable()){
diskDataCenter.insertQueue(queue);
}
//4.插入内存
memoryDataCenter.insertQueue(queue);
System.out.println("[VirtualHost] 队列创建成功! queueName: " + queueName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 创建队列失败! queueName: " + queueName);
e.printStackTrace();
return false;
}
}
10.4 删除队列
public boolean queueDelete(String queueName){
queueName = virtualHostName + queueName;
try {
synchronized (queueLocker){
MSGQueue toDelete = memoryDataCenter.getQueue(queueName);
if (toDelete == null){
throw new MqException("[VirtualHost] 队列不存在,无法删除!");
}
if(toDelete.isDurable()){
diskDataCenter.deleteQueue(queueName);
}
memoryDataCenter.deleteQueue(queueName);
System.out.println("[VirtualHost] 队列删除成功! queueName: " + queueName);
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 队列删除失败! queueName: " + queueName);
e.printStackTrace();
return false;
}
}
10.5 创建绑定
public boolean queueBind(String queueName,String exchangeName,String bindingKey){
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
synchronized (queueLocker){
//1.判断绑定是否存在
Binding existsBinding = memoryDataCenter.getBinding(exchangeName,queueName);
if(existsBinding != null){
throw new MqException("[VirtualHost] 绑定已经存在! queueName: " + queueName + ", exchangeName: "
+ exchangeName);
}
//2.判断绑定是否合法
if(!router.checkBindingKey(bindingKey)){
throw new MqException("[VirtualHost] bindingKey 不合法! bindingKey: " + bindingKey);
}
//3.创建 Binding 对象
Binding binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
//4.获取到对应的交换机和队列,如果对应的交换机和队列不存在,这样的绑定是无法创建的
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 该绑定对应的队列不存在! queueName: " + queueName);
}
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange == null){
throw new MqException("[VirtualHost] 该绑定对应的交换机不存在! exchangeName: " + exchangeName);
}
if(queue.isDurable() && exchange.isDurable()){
diskDataCenter.insertBinding(binding);
}
memoryDataCenter.insertBinding(binding);
System.out.println("[VirtualHost] 创建绑定成功! queueName: " + queueName + ", exchangeName: "
+ exchangeName);
}
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 绑定创建失败! queueName: " + queueName + ", exchangeName: "
+ exchangeName);
e.printStackTrace();
return false;
}
}
10.6 删除绑定
public boolean queueUnbind(String queueName, String exchangeName){
queueName = virtualHostName + queueName;
exchangeName = virtualHostName + exchangeName;
try {
synchronized (exchangeLocker){
synchronized (queueLocker){
//1.判断绑定是否存在
Binding toDelete = memoryDataCenter.getBinding(exchangeName,queueName);
if(toDelete == null){
throw new MqException("[VirtualHost] 绑定不存在,无法删除! queueName: " + queueName
+ ", exchangeName: " + exchangeName);
}
//2.无论绑定是否持久化,都尝试在硬盘上删一下,就算不存在,这个删除操作也没有副作用
diskDataCenter.deleteBinding(toDelete);
memoryDataCenter.deleteBinding(toDelete);
System.out.println("[VirtualHost] 删除绑定成功! queueName: " + queueName + ", exchangeName: " + exchangeName);
}
}
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 删除绑定失败!");
return false;
}
}
10.7 发送消息到指定的队列
实现步骤:
- 检查 routingKey 是否合法
- 判断交换机是否存在
- 判断交换机的类型,根据不同的类型决定如何进行后续转发
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body){
try {
//1.转换交换机的名字
exchangeName = virtualHostName + exchangeName;
//2.检查 routingKey 是否合法
if(!router.checkRoutingKey(routingKey)){
throw new MqException("[virtualHost] routingKey 非法! routingKey: " + routingKey);
}
//3.查找交换机对象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange == null){
throw new MqException("[virtualHost] 交换机不存在! exchangeName: " + exchangeName);
}
//4.判定交换机的类型
if(exchange.getType() == ExchangeType.DIRECT){
//按照直接交换机的方式转发消息
//以 routingKey 作为队列的名字,直接把消息写入指定的队列中
//此时,可以无视绑定关系
String queueName = virtualHostName + routingKey;
//5.构造消息对象
Message message = Message.createMessageWithId(routingKey,basicProperties,body);
//6.查找队列名对应的对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[virtualHost] 队列不存在! queueName: " + queueName);
}
//7.队列存在,给队列中写入消息
sendMessage(queue,message);
}else{
//按照 fanout 和 topic 的方式来转发
//5.找到该交换机关联的所有绑定
ConcurrentHashMap<String,Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);
for(Map.Entry<String,Binding> entry : bindingsMap.entrySet()){
// 1) 获取到绑定队列,判定对应的队列是否存在
Binding binding = entry.getValue();
MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if(queue == null){
System.out.println("[virtualHost] basicPublish 发送消息时,发现队列不存在! queueName: " + binding.getQueueName());
continue;
}
// 2) 构造消息对象
Message message = Message.createMessageWithId(routingKey,basicProperties,body);
// 3) 判定这个消息是否能转发给该队列
// 如果是 fanout, 所有绑定的队列都要进行转发
// 如果是 topic, 需要判定 bindingKey 和 routingKey 是否匹配
if(!router.route(exchange.getType(),binding,message)){
continue;
}
// 4) 真正转发消息给队列
sendMessage(queue,message);
}
}
return true;
}catch (Exception e){
System.out.println("[virtualHost] 消息发送失败!");
e.printStackTrace();
return false;
}
}
上述过程中涉及到的被调用的 API:
private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
//此处发送消息,就是调用之前封装好的 api, 写到内容和硬盘上
int deliverMode = message.getDeliverMode();
//deliverMode 为 1 不持久化
//deliverMode 为 2 要持久化
if(deliverMode == 2){
diskDataCenter.sendMessage(queue,message);
}
//写入内存
memoryDataCenter.sendMessage(queue,message);
//通知消费者可以消费消息了
consumerManager.notifyConsume(queue.getName());
}
10.8 订阅消息
/**
* 订阅消息
* 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
* @param consumerTag 消费者的身份标识
* @param queueName
* @param autoAck 消息被消费完成后,应答的方式 true:自动应答; false:手动应答
* @param consumer 是一个回调函数,此处类型设定为函数式接口,后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 表达式
* @return
*/
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){
//构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 ConsumerEnv 对象添加到该队列中
queueName = virtualHostName + queueName;
try {
consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
System.out.println("[virtualHost] basicConsume 成功! queueName: " + queueName);
return true;
}catch (Exception e){
System.out.println("[virtualHost] basicConsume 失败! queueName: " + queueName);
e.printStackTrace();
return false;
}
}
上述过程涉及到了 ConsumerManager 类:
public class ConsumerManager {
//持有上层 VirtualHost 对象的引用,用来操作数据
private VirtualHost parent;
//指定一个线程池,执行具体的回调任务
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//存放令牌的队列
private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();
//扫描线程
private Thread scannerThread = null;
public ConsumerManager(VirtualHost p){
this.parent = p;
scannerThread = new Thread(() ->{
while (true){
try {
//1.拿到令牌
String queueName = tokenQueue.take();
//2.根据令牌找到队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if(queue == null){
throw new MqException("[ConsumerManager] 取令牌时队列名不存在! queueName: " + queueName);
}
//3.从队列中消费一个消息
synchronized (queue){
consumeMessage(queue);
}
} catch (InterruptedException | MqException e) {
e.printStackTrace();
}
}
});
//线程设为后台线程
scannerThread.setDaemon(true);
scannerThread.start();
}
/**
* 调用时机:发送消息的时候
* @param queueName
* @throws InterruptedException
*/
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//找到对应的队列
MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
if(queue == null){
throw new MqException("[ConsumerManager] 队列不存在! queueName: " + queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
synchronized (queue){
queue.addConsumerEnv(consumerEnv);
int n = parent.getMemoryDataCenter().getMessageCount(queueName);
for(int i = 0; i < n; i++){
//调用一次就消费一条消息
consumeMessage(queue);
}
}
}
/**
* 消费一个消息
* @param queue
*/
private void consumeMessage(MSGQueue queue) {
// 1.按照轮询的方式找个消费者出来
ConsumerEnv lucyDog = queue.chooseConsumer();
if(lucyDog == null){
return;//当前队列没有消费者,暂时不消费
}
//2.从队列中取出一个消息
Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
if(message == null){
return;//当前队列中没有消息,也不需要消费
}
//3.把消息带入到消费者的回调方法中,让线程池去执行
workerPool.submit(() ->{
try {
// 1) 先把消息放到待确认的集合中
parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
// 2) 执行回调
lucyDog.getConsumer().handleDelivery(lucyDog.getConsumerTag(),message.getBasicProperties(),
message.getBody());
// 3) 如果当前是 自动应答,直接把消息删除
// 如果当前是手动应答,交给后续消费者调用 basicAck 来处理
if(lucyDog.isAutoAck()){
//删除硬盘上的消息
if(message.getDeliverMode() == 2){
//当前这个消息是持久化存储,需要删除硬盘上的消息
parent.getDiskDataCenter().deleteMessage(queue,message);
}
//删除等待应答的消息
parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
//删除内存中的消息
parent.getMemoryDataCenter().removeMessage(message.getMessageId());
System.out.println("[ConsumerManager] 消息被成功消费! queueName: " + queue.getName());
}
}catch (Exception e){
e.printStackTrace();
}
});
}
}
10.9 消息确认
public boolean basicAck(String queueName, String messageId){
queueName = virtualHostName + queueName;
try {
Message message = memoryDataCenter.getMessage(messageId);
if(message == null){
//要确认的消息不存在
throw new MqException("[VirtualHost] 要确认的消息不存在! messageId: " + messageId);
}
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 要确认的队列不存在! queueName: " + queueName);
}
//1.删除硬盘上的数据
if(message.getDeliverMode() == 2){
diskDataCenter.deleteMessage(queue,message);
}
//2.删除消息中心中的数据
memoryDataCenter.removeMessage(messageId);
//3.删除待确认集合中的消息
memoryDataCenter.removeMessageWaitAck(queueName,messageId);
System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName: " + queueName + ", messageId: " +
messageId);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] basicAck 失败! 消息被确认失败! queueName: " + queueName + ", messageId: " +
messageId);
e.printStackTrace();
return false;
}
}
十一、网络通信协议设计
生产者和消费者都是客户端程序,并且需要通过网络远程调用 BrokerServer 提供的 API, 这里,我使用 TCP 作为底层协议,在这个基础上自定义应用层协议,简单来说就是约定一下生产者以及消费者和 BrokerServer 之间交互的规范或者是传输数据的格式。
客户端要调用的功能有以下几个部分:
- 创建 channel
- 关闭 channel
- 创建 exchange
- 删除 exchange
- 创建 queue
- 删除 queue
- 创建 binding
- 删除 binding
- 发送 message
- 订阅 message
- 发送 ack
- 返回 message (服务器 -> 客户端)
11.1 设计应用层协议
因为 Message 本身就是二进制数据,因此这里同样使用二进制的方式设定协议。
其中 type 表示请求响应不同的功能,取值如下:
0x1 |
创建 channel
|
0x2 | 关闭 channel |
0x3 |
创建 exchange
|
0x4 |
销毁 exchange
|
0x5 |
创建 queue
|
0x6 |
销毁 queue
|
0x7 |
创建 binding
|
0x8 |
销毁 binding
|
0x9 |
发送 message
|
0xa |
订阅 message
|
0xb |
返回 ack
|
0xc |
服务器给客户端推送的消息
|
- 对于请求来说, payload 表示这次方法调用的各种参数信息
- 对于响应来说, payload 表示这次方法调用的返回值
11.2 定义Request/Response
/**
* @description:表示一个请求对象,按照自定义协议的格式展开
* @created by 清风 on 2024/7/30 21:21
*/
@Data
public class Request {
private int type;
private int length;
private byte[] payload;
}
/**
* @description:这是一个响应,也是根据自定义应用层协议来的
* @created by 清风 on 2024/7/30 21:22
*/
@Data
public class Response {
private int type;
private int length;
private byte[] payload;
}
11.3 定义参数父类
/**
* @description:使用这个类来表示方法的公共参数/辅助的字段
* 后续每个方法就会有不同的参数,不同的参数使用不同的子类表示
* @created by 清风 on 2024/7/30 21:24
*/
@Data
public class BasicArguments implements Serializable{
// 表示一次请求和一次响应的身份标识,可以把请求和响应对上
protected String rid;
//本次通信使用的 channel 的身份标识
protected String channelId;
}
11.4 定义返回值父类
/**
* @description:表示各个远程的调用的方法的返回值的公共信息
* @created by 清风 on 2024/7/30 21:28
*/
@Data
public class BasicReturns implements Serializable {
//用来标识唯一的请求和响应
protected String rid;
//用来标识一个 channel
protected String channelId;
//表示远程调用方法的返回值
protected boolean ok;
}
11.5 定义其他参数类
/**
* @description:这个类是创建交换机的请求参数的类
* @created by 清风 on 2024/7/30 21:33
*/
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private ExchangeType exchangeType;
private boolean durable;
private boolean autoDelete;
private Map<String,Object> arguments;
}
- 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图片的结构
- 按照 length 长度读取出 payload, 就可以把读到的⼆进制数据转换成 ExchangeDeclareArguments 对象
十二、实现 BrokerServer 类
public class BrokerServer {
private ServerSocket serverSocket = null;
//一个 BrokerServer 上只有一个虚拟主机
private VirtualHost virtualHost = new VirtualHost("default");
//存储当前的所有会话,也就是有哪些客户端正在和服务器进行通信
//此处的 key 是 channelId; value 为对应的 socket 对象
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
//引入线程池来处理多个客户端的请求
private ExecutorService executorService = null;
//引入一个 boolean 变量控制服务器是否继续运行
private volatile boolean runnable = true;
}
12.1 启动/停止服务器
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public void start() throws IOException {
System.out.println("[BrokerServer] 启动!");
executorService = Executors.newCachedThreadPool();
try {
while (runnable){
Socket clientSocket = serverSocket.accept();
//把处理连接的逻辑丢给线程池
executorService.submit(() ->{
processConnection(clientSocket);
});
}
}catch (SocketException e){
System.out.println("[BrokerServer] 服务器停止运行!");
}
}
/**
* 停止服务器
*/
public void stop() throws IOException {
runnable = false;
executorService.shutdownNow();//把线程池中的任务都放弃,让线程都销毁
serverSocket.close();
}
12.2 实现处理连接
/**
* 通过这个方法来处理一个客户端的连接
* 在这个连接中,可能会涉及到多个请求和响应
* @param clientSocket
*/
private void processConnection(Socket clientSocket) {
try (InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream()){
//这里需要按照特定格式读取并解析,此时需要用到 DataInputStream 和 DataOutputStream
try (DataInputStream dataInputStream = new DataInputStream(inputStream);
DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
while (true){
//1.读取请求并解析
Request request = readRequest(dataInputStream);
//2.根据请求计算响应
Response response = process(request,clientSocket);
//3.把响应写回给客户端
writeResponse(dataOutputStream,response);
}
}catch (EOFException | SocketException e){
//对于当前代码, DataInputStream 如果读到 EOF, 就会抛出一个 EOFException 异常
//需要借助这个异常来结束循环
System.out.println("[BrokerServer] 连接关闭! 客户端地址: " + clientSocket.getInetAddress().toString()
+ ":" + clientSocket.getPort());
}
}catch (IOException | ClassNotFoundException | MqException e){
System.out.println("[BrokerServer] Connection 出现异常!");
e.printStackTrace();
}finally {
try {
//关闭 socket
clientSocket.close();
//一个 TCP 连接中,可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 清理掉
clearClosedSession(clientSocket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
12.3 实现 readRequest / writeResponse
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = dataInputStream.read(payload);
if(n != request.getLength()){
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
dataOutputStream.flush();
}
12.4 实现处理请求
- 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
- 再根据不同的 type, 分别处理不同的逻辑,(主要是调⽤ virtualHost 中不同的方法)
- 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客户端
- 最后构造成统⼀的响应
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
//1.把 request 中的 payload 做一个初步的解析
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid: " + basicArguments.getRid() + ", channelId: " + basicArguments.getChannelId()
+ ", type: " + request.getType() + ", length: " + request.getLength());
//2.根据 type 的值来进一步区分接下来这次请求要干什么
boolean ok = true;
if(request.getType() == 0x1){
//创建 channel
sessions.put(basicArguments.getChannelId(), clientSocket);
System.out.println("[BrokerServer] 创建 channel 完成! channelId: " + basicArguments.getChannelId());
}else if(request.getType() == 0x2){
//销毁 channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer] 销毁 channel 完成! channelId: " + basicArguments.getChannelId());
}else if(request.getType() == 0x3){
//创建一个交换机
//此时 payload 就是 ExchangeDeclareArguments 对象
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(),
arguments.isAutoDelete(),arguments.getArguments());
}else if(request.getType() == 0x4){
//销毁交换机
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType() == 0x5) {
//创建队列
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(),
arguments.isAutoDelete(),arguments.getArguments());
}else if(request.getType() == 0x6){
//销毁队列
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete(arguments.getQueueName());
}else if(request.getType() == 0x7){
//创建 Binding
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
}else if(request.getType() == 0x8){
//删除 Binding
QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
}else if(request.getType() == 0x9){
//发送消息
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
arguments.getBasicProperties(),arguments.getBody());
}else if(request.getType() == 0xa){
//订阅消息
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
//这个回调函数要做的工作:把服务器收到的消息直接推送给对应的客户端
//先知道当前这个收到的消息要发给哪个客户端
//此处 consumerTag 其实是 channelId
//1.根据 channel 找到 Socket 对象
Socket clientSocket = sessions.get(consumerTag);
if(clientSocket == null || clientSocket.isClosed()){
throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
}
//2.构造响应数据
//此处 response 的 payload 就是 SubScribeReturns
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setConsumerTag(consumerTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setChannelId(consumerTag);
subScribeReturns.setRid("");
subScribeReturns.setOk(true);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
Response response = new Response();
response.setType(0xc);//0xc 表示服务器给消费者客户端推送的数据
response.setLength(payload.length);
response.setPayload(payload);
//3.把数据写回给客户端
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream,response);
}
});
}else if(request.getType() == 0xb){
//调用 basicAck 来确认消息
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
}else {
//当前的 type 是非法的
throw new MqException("[BrokerServer] 未知的 type: " + request.getType());
}
//3.构造响应
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid: " + basicReturns.getRid() + ", channelId: "
+ basicReturns.getChannelId() + ", type: "
+ response.getType() + ", length: " + response.getLength());
return response;
}
12.5 实现清理过期的会话
/**
* 清理过期的会话
* @param clientSocket
*/
private void clearClosedSession(Socket clientSocket) {
//遍历 sessions 哈希表
List<String> toDeleteChannelId = new ArrayList<>();
for(Map.Entry<String,Socket> entry : sessions.entrySet()){
if(entry.getValue() == clientSocket){
toDeleteChannelId.add(entry.getKey());
}
}
for(String channelId : toDeleteChannelId){
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId: " + toDeleteChannelId);
}
十三、实现客户端
13.1 创建 ConnectionFactory
用来创建连接的工厂类:
@Data
public class ConnectionFactory {
// broker server 的 ip 地址
private String host;
// broker server 的端口号
private int port;
// 访问 broker server 的哪个虚拟主机.
// 下列几个属性暂时不搞了.
// private String virtualHostName;
// private String username;
// private String password;
public Connection newConnection() throws IOException {
Connection connection = new Connection(host, port);
return connection;
}
}
13.2 Connection 和 Channel 定义
- 一个客户端可以创建多个 Connection
- 一个 Connection 对应一个 Socket,一个TCP 连接
- 一个 Connection 可以包含多个 Channel
Connection 定义: (这个类中其他的方法在我的项目源码中自行观看,主要包括处理响应、统一封装写请求和读取响应以及创建 Channel)
@Data
public class Connection {
private Socket socket = null;
// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.
private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
private ExecutorService callbackPool = null;
}
- Socket 是客户端持有的套接字
- InputStream OutputStream DataInputStream ,DataOutputStream 均为 socket 通信的接口
- channelMap 用来管理该连接中所有的 Channel
- callbackPool 是用来在客户端这边执行用户回调的线程池
Channel 定义(这个类中主要包括的方法就是构造请求参数,和服务器交互进行相关的操作,也就是远程调用服务器提供的 API,可在项目源码自行观看)
@Data
public class Channel {
private String channelId;
// 当前这个 channel 属于哪个连接.
private Connection connection;
// 用来存储后续客户端收到的服务器的响应.
private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.
// 此处约定一个 Channel 中只能有一个回调.
private Consumer consumer = null;
public Channel(String channelId, Connection connection) {
this.channelId = channelId;
this.connection = connection;
}
}
十四、样例演示
生产者:
/**
* @description:这个类用来表示一个生产者
* 通常这是一个单独的服务器程序
* @created by 清风 on 2024/8/3 19:38
*/
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建交换机和队列
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("testQueue",true,false,false,null);
//创建一个消息并发送
byte[] body = "hello".getBytes();
boolean ok = channel.basicPublish("testExchange","testQueue",null,body);
System.out.println("消息投递完成! ok :" + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
消费者:
/**
* @description:这个类表示一个消费者
* 通常这个类也应该是在一个独立的服务器中被执行
* @created by 清风 on 2024/8/3 19:39
*/
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
channel.queueDeclare("testQueue",true,false,false,null);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag: " + consumerTag);
System.out.println("basicProperties: " + basicProperties);
String bodyString = new String(body,0, body.length);
System.out.println("body: " + bodyString);
System.out.println("[消费数据] 结束!");
}
});
//模拟一直等待消费
while (true){
Thread.sleep(500);
}
}
}
启动项目之后,再先后启动消费者和生产者:
- 启动项目,建库建表:
- 启动消费者:
- 启动生产者:
- 查看消费者端控制台:
至此,一个简易版本的MQ实现。文章篇幅太长,可能过于繁琐,还请各位读者有不满意的地方多多指教!