目录
文件负责数据的存储, 内存负责数据的管理, 为了保证MQ的传输效率, 所以注定无法使用硬盘管理数据
一. 数据存储方式
这里主要使用哈希表, 链表, 嵌套结构存储和管理数据
交换机管理
第一个key为交换名字(exchangeName), value为交换机对象,
private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
队列管理
第一个key为队列名(queueName), value为队列对象
private ConcurrentHashMap<String, MSGQueue> msgQueueMap = new ConcurrentHashMap<>();
绑定关系管理
第一个key为交换机名(exchangeName), value为HashMap,HashMap中存储的key为队列名(queueName),value为Binding对象
private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
消息管理
第一个Key为消息Id(messageId), value为消息对象
private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
可以使用消息Id快速的查找对象
队列中的所有消息
第一个Key为队列名(queueName), value为链表(链表中的每一个对象都是message对象)
private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
便于对队列中的对象进行管理,适用于队列这种先来先服务模式
队列中未应答消息
第一个key为队列名(queueName),value为hashMap, 其中key为MessageId, value为消息对象
private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitMap = new ConcurrentHashMap<>();
便于维护队列中已发送给消费者但还未被确认(ACK)的消息
总体上采用HashMap的方式, 可以快速的进行索引查询
二. 交换机管理
这里的添加操作和之前的不同, 这里是添加进入内存中的容器HashMap中, 并非数据库中的容器
//增加交换机
public void insertExchange(String exchangeName,Exchange exchange){
exchangeMap.put(exchangeName, exchange);
System.out.println("[MemoryDataCenter] 交换机添加成功");
}
//删除交换机
public void deleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);
System.out.println("[MemoryDataCenter] 交换机删除成功");
}
//查找交换机
public Exchange getExchange(String exchangeName){
return exchangeMap.get(exchangeName);
}
三. 队列管理
这里也是在内存中进行增加, 删除, 查找操作
public void insertQueue(String queueName,MSGQueue queue){
msgQueueMap.put(queueName, queue);
System.out.println("[MemoryDataCenter] 队列添加成功");
}
public void deleteQueue(String queueName){
msgQueueMap.remove(queueName);
System.out.println("[MemoryDataCenter] 队列删除成功");
}
public MSGQueue getQueue(String queueName){
return msgQueueMap.get(queueName);
}
四. 绑定管理
这里采用由内到外的方式
添加绑定
- 先检查内部的HashMap中是否存在数据
- 不存在则创建, 存在则检查最外层的是否存在
- 不存在则创建, 存在则抛出异常,无法添加
public void insertBinding(Binding binding) throws MqException {
// 1.检查这个交换机的value是否存在,如果不存在则需要创建(根据key检查value是否存在)
/* ConcurrentHashMap<String, Binding> bindMap = bindingsMap.get(binding.getExchangeName());
if(bindMap == null){
bindMap = new ConcurrentHashMap<>();
bindingsMap.put(binding.getExchangeName(),bindMap);
}*/
//另一种写法
ConcurrentHashMap<String, Binding> bindMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
k -> new ConcurrentHashMap<>());
synchronized (bindMap) {
// 2.如果根据交换机名和队列名,查询出来的关系存在,则抛出异常,因为已经两者之间已经存在关系,所以不能插入
if (bindMap.get(binding.getQueueName()) != null) {
throw new MqException("[MemoryDataCenter] 绑定关系已经存在,不能继续绑定");
}
bindMap.put(binding.getQueueName(), binding);
System.out.println("[MemoryDataCenter] 绑定添加成功");
}
}
删除绑定
- 先检查内部绑定是否存在
- 不存在则无法删除,存在则删除里面的HashMap
- 然后删除外面的hashMap
public void deleteBinding(Binding binding) throws MqException {
ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if(bindingMap == null){
throw new MqException("[MemoryDataCenter] 绑定关系不存在,无法删除");
}
bindingMap.remove(binding.getQueueName());
System.out.println("[MemoryDataCenter] 绑定删除成功");
}
获取绑定(唯一)
这里采用从外到内的方式查询
- 先检查交换机是否存在
- 不存在则返回,存在则继续查看交换机
- 存在返回数据,不存在返回 null
public Binding getBinding(String exchangeName ,String queueName){
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
//判断最外层value是否为空
if(bindingMap==null){
return null;
}
return bindingMap.get(queueName);
}
获取所有的绑定
- 直接根据交换机进行查找并返回
public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){
return bindingsMap.get(exchangeName);
}
五. 消息管理 ! ! !
添加消息
public void insertMessage(Message message){
messageMap.put(message.getMessageId(),message);
System.out.println("[MemoryDataCenter] 添加消息成功");
}
查询消息(根据MessageID查询消息)
//根据messageID查询消息
public Message getMessage(String messageId){
return messageMap.get(messageId);
}
删除消息(根据MessageId删除消息)
- 先检查消息是否存在
- 如果存在才能进行删除操作
//根据messageId删除消息
public void deleteMessage(String messageId) throws MqException {
Message message = messageMap.get(messageId);
if(message==null){
throw new MqException("[MemoryDataCenter] 消息不存在,无法删除");
}
messageMap.remove(messageId);
System.out.println("[MemoryDataCenter] 删除消息成功");
}
队列中的消息管理
发送消息到队列
- 检查队列中是否存在数据列表
- 存在则向其中添加数据, 不存在则创建一个链表
- 将消息直接添加到队列中
- 在总消息统计中也需要添加
public void sendMessage(MSGQueue queue,Message message){
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());
synchronized (messages){
messages.add(message);
}
//在队列中添加,在总的消息管理中也需要添加
//因为messageID相同,所有也不用担心内容不一样
insertMessage(message);
System.out.println("[MemoryDataCenter] 消息成功放入队列中");
}
从队列中取出消息
- 检查是否存在列表
- 如果不存在, 则直接返回 null, 如果存在判断里面是否存在数据
- 如果存在数据,则返回第一个,如果不存在则返回 null
public Message pollMessage(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
//判断队列中有没有消息
if(messages==null){
return null;
}
synchronized (messages){
if(messages.size()==0){
return null;
}
//采用从头取出(头插法)
Message curmessage = messages.remove(0);
System.out.println("[MemoryDataCenter] 消息成功从队列中取出");
return curmessage;
}
}
获取队列中消息的个数
- 判断链表是否存在
- 如果不存在,直接返回0,
- 如果存在则返回消息个数
public int getQueueMessageCount(String queueName){
LinkedList<Message> messages = queueMessageMap.get(queueName);
//判断队列中有没有消息
if(messages == null){
return 0;
}
synchronized (messages){
return messages.size();
}
}
队列中未确认的消息管理
添加未确认消息
- 检查HashMap是否存在,
- 如果不存在,则创建一个,如果存在,则向里面添加未处理的消息
public void insertMessageWaitAck(MSGQueue queue,Message message) {
ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.computeIfAbsent(queue.getName(), k -> new ConcurrentHashMap<>());
messageWaitAckMap.put(message.getMessageId(), message);
System.out.println("[MemoryDataCenter] 未处理消息添加成功");
}
删除未确认的消息
- 检查HashMap是否存在
- 如果不存在, 则返回null, 如果存在,则进行删除操作
public void deleteMessageWaitAck(String queueName,String messageId){
ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);
if(messageWaitAckMap==null){
return;
}
messageWaitAckMap.remove(messageId);
System.out.println("[MemoryDataCenter] 未处理消息删除成功");
}
获取指定的未确认消息
- 检查HashMap是否存在
- 如果不存在, 则返回null, 如果存在, 则返回其中具体消息(根据MessageId返回)
public Message getMessageWaitAck(String queueName,String messageId){
ConcurrentHashMap<String, Message> messageWaitAckMap = queueMessageWaitMap.get(queueName);
if(messageWaitAckMap==null){
return null;
}
return messageWaitAckMap.get(messageId);
}
六.数据加载
将文件数据加载到内存中
- 保证内存存储数据为空
- 加载交换机数据
- 加载队列数据
- 加载绑定关系
- 加载消息数据 (1. 所有的消息 2. 队列下的消息)
public void recovery(DiskDataCenter dataCenter) throws IOException, MqException, ClassNotFoundException {
// 1.清空之前的数据
exchangeMap.clear();
msgQueueMap.clear();
bindingsMap.clear();
messageMap.clear();
queueMessageWaitMap.clear();
// 2.加载交换机数据
List<Exchange> exchanges = dataCenter.selectAllExchange();
for (Exchange exchange :exchanges){
exchangeMap.put(exchange.getName(),exchange);
}
// 3.加载队列数据
List<MSGQueue> msgQueues = dataCenter.selectAllMSGQueue();
for (MSGQueue queue:msgQueues){
msgQueueMap.put(queue.getName(),queue);
}
// 4.加载绑定关系(将一个个绑定添加进里面)
//先检查交互机是否存在,不存在则创建对应的hashMap,然后添加其中的内容
List<Binding> bindings = dataCenter.selectAllBinding();
for (Binding binding:bindings){
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());
bindingMap.put(binding.getQueueName(), binding);
}
// 5.恢复所有的消息数据(添加每一个队列中的消息)(这里有两个,1.管理所有的消息,2.管理队列下的消息)
for(MSGQueue queue: msgQueues){
LinkedList<Message> messages = dataCenter.selectAllMessage(queue);
//队列下的消息
queueMessageMap.put(queue.getName(), messages);
for (Message message:messages){
messageMap.put(message.getMessageId(),message);
}
}
// 6.未被确认的消息
//这里未被确认的消息,并不会被加载进入内存,考虑情况:服务器如果重启,那么未被确认的消息,再一次被加载,实际上被当做没有取出的消息
//这时候只需要重新取出未被取出的消息,那么和方法5(加载数据)是一样的
}
未被确认的消息怎么处理?
在本项目中, 数据加载的过程, 多数出现在重启服务器的场景中, 未被确认的消息并不会直接存储进入内存, 而是被当做普通消息加载进入内存, 已经被确认的数据会被直接删除掉, 未被确认的数据会被当做新数据加载进入内存
缺点: 造成消息的冗余发送 优点: 实现方便
优化建议:
1. 数据的存储持久化
引入一个标志位, 表示消息是否处于发送且未确认状态, 在数据加载的时候, 根据标志位将数据加载放入内存存储
2. 超时重投机制
如果消息发送给消费者, 消费者消费数据时, 未发送确认就崩溃, 那么消息会一直处于发送未确认状态, 会造成资源浪费
检测超时未确认的消息, 超过一定时间间隔会触发重新发送消息
3. 重投次数限制和死信队列
避免由于消费者的持续异常状态(依赖不可用等 )造成的无限重投, 可以规定最多重投次数, 超过该次数, 则丢到死信队列中, 方便人为去分析处理这些未被确认的消息