RabbitMinQ(模拟实现消息队列项目)02

发布于:2025-09-02 ⋅ 阅读:(20) ⋅ 点赞:(0)

目录

十.整合数据库和文件数据

创建DiskDataManager类

十一.内存结构设计

创建MeneryDataCenter类:

实现集合操作:

对MemoryDataCenter类功能测试:

十二.整合内存和磁盘数据

创建VirtualHost类:

Exchange:

MSGQueue:

Binding:

创建Router类

对Router类的TOPIC匹配进行测试:

发送消息:

创建ConsumerManager类:

订阅消息:

创建ConsumerEnv类:

创建Consumer接口:

为MSGQueue类新增加二个属性和方法,用于管理订阅队列的消费者集合:

在ConsumerManager类中实现添加消费者方法:

十三.网络通信协议设计

设计应用层协议:

创建request类:

创建response类:

创建参数父类:

创建响应父类:

创建设备功能的参数类:

十四.实现BrokerServer

十五.实现客户端

创建ConnectionFactory.

创建Connection类:

创建Channel类:

客户端代码测试:

十六.完成

成果测试:

启动消息队列服务器:

创建生产者 发送消息:

创建消费者消费消息:


十.整合数据库和文件数据

上面的代码中,使用数据库存储了Exchange,Queue,Binding,使用文件存储了Message,

下面对数据库和文件中的数据进行整合.进行统一管理.

创建DiskDataManager类

/**
 * 对数据库中的Exchange,Queue,Binding和文件中的Message数据进行整合
 * 统一管理,后续上层代码直接调用该类中的方法即可,无需再向下层数据结构调用
 */
public class DiskDataManager {
    private DataBaseManager dataBaseManager = new DataBaseManager();
    private MessageFileManager messageFileManager = new MessageFileManager();

    public void init() throws JsonProcessingException {
        dataBaseManager.init();
        messageFileManager.init();
    }

    //交换机:
    //添加交换机
    public void insertExchange(Exchange exchange){
        dataBaseManager.insertExchange(exchange);
    }
    //删除交换机
    public void deleteExchange(String exchangeName){
        dataBaseManager.deleteExchange(exchangeName);
    }
    //查找交换机
    public List<Exchange> selectAllExchanges(){
        return dataBaseManager.selectAllExchanges();
    }
    //队列
    //添加队列
    public void insertQueue(MSGQueue queue) throws IOException, MqException {
        dataBaseManager.insertQueue(queue);
        //创建队列后,不仅要将队列写入到数据库中,还要创建出对应的目录和文件
        messageFileManager.createQueueFile(queue.getName());
    }

    //删除队列
    public void deleteQueue(String queueName) throws IOException {
        dataBaseManager.deleteQueue(queueName);
        //删除队列后,还要讲对应的目录和文件删除
        messageFileManager.destoryQueueFile(queueName);
    }
    //查找队列
    public List<MSGQueue> selectAllQueues(){
        return dataBaseManager.selectAllQueues();
    }
    //绑定关系
    //添加绑定关系
    public void insertBinding(Binding binding){
        dataBaseManager.insertBinding(binding);
    }
    //删除绑定关系
    public void deleteBinding(String bingingKey){
        dataBaseManager.deleteBindings(bingingKey);
    }
    //查找绑定
    public List<Binding> selectAllBindings(){
        return dataBaseManager.selectAllBindings();
    }
    //消息
    //发送消息
    public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {
        messageFileManager.sendMessage(queue,message);
    }

    //删除消息
    public void deleteMessageFromQueue(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {
        messageFileManager.deleteMessageFromFile(queue,message);
        //删除消息后,查看是否需要进行GC
        if(messageFileManager.checkGC(queue.getName())){
            messageFileManager.GC(queue);
        }
    }

    //加载所有的消息到内存中
    public List<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
        return messageFileManager.loadAllMessage(queueName);
    }
}

十一.内存结构设计

将数据存储到数据库和文件,是为了实现其持久性,但数据还是要存储在内存上的,这样才能更快的访问到数据.

创建MeneryDataCenter类:

这里通过设计不同的数据集合来存储数据在内存中.

/**
 * 将数据存储在内存中,创建不同的数据集合来管理
 * 要管理的数据有:
 * 交换机
 * 队列
 * 绑定关系
 * 消息
 * 队列中的消息集合
 * 待确认消息队列中的消息集合
 */
public class MemoryDataCenter {
    //key:exchangeName
    private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
    //key:queueName
    private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
//   key1:exchangeName key2:queueName
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
    //key: messageId
    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
//  key:queueName  List:message
    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
    //存储在手动确认模式下,管理待确认的消息和队列,在未收到确认消息时,要先将数据存储到这个数据集合中,
//  key1:queueName   key2:messageId
    private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> WaitAckQueueMessageMap = new ConcurrentHashMap<>();
}

实现集合操作:

 //交换机:
    //插入
    public void insertExchange(Exchange exchange){
        exchangeMap.put(exchange.getName(),exchange);
        System.out.println("[MemoryDataCenter] 新增交换机成功 exchangeName:"+exchange.getName());
    }
    //删除
    public void deleteExchange(String exchangeName){
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 删除交换机成功 exchangeName:"+exchangeName);
    }
    //查找
    public Exchange getExchange(String exchangeName){
        Exchange exchange = exchangeMap.get(exchangeName);
        return exchange;
    }

 //队列
    //插入
    public void insertQueue(MSGQueue queue){
        queueMap.put(queue.getName(),queue);
        System.out.println("[MemoryDataCenter] 新增队列成功! queueName: "+queue.getName());
    }
    //删除
    public void deleteQueue(String queueName){
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功! queueName: "+queueName);
    }
    //查找
    public MSGQueue getQueue(String queueName){
        return queueMap.get(queueName);
    }

//绑定关系:
    //新增
    public void insertBinding(Binding binding) throws MqException {
//        //绑定关系不存在时,创建一个,存在时,进行覆盖
//        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getQueueName());
//        if(bindingMap==null){
//            bindingMap = new ConcurrentHashMap<>();
//        }
//        bindingMap.put(binding.getQueueName(),binding);
//        bindingsMap.put(binding.getExchangeName(),bindingMap);
        //这个方法是ConcurrentMap方法用来判断对应的哈希表是否存在,不存在就执行第二个参数,存在就直接赋值,和上面的逻辑是一样的
        //且该方法是原子的,不存在线程安全问题
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                f -> new ConcurrentHashMap<>());
        //此处可能会存在线程安全问题,以绑定关系为基准进行上锁
        synchronized (binding){
            Binding binding1 = bindingMap.get(binding.getQueueName());
            //当绑定关系已经存在时,抛出异常,只有新的绑定插入时,才会成功
            if(binding1!=null){
                throw new MqException("[MemoryDataCenter] 绑定已存在 exchangeName: "+binding.getExchangeName()
                        +" ,queueName: "+binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(),binding);
        }
        System.out.println("[MemoryDataCenter] 新的绑定创建成功! +exchangeName:"+binding.getExchangeName()
                +" ,queueName: "+binding.getQueueName()+" ,bindingKey: "+binding.getBindingKey());
    }
    //删除
    public void deleteBinding(Binding binding) throws MqException {
    //先判断交换机是否存在绑定,不存在时,无法删除.抛出异常
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if(bindingMap==null){
            throw new MqException("[MemoryDataCenter] 无绑定关系,删除失败 exchangeName: "+binding.getExchangeName()
                    +" ,queueName: "+binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除成功 exchangeName: "+binding.getExchangeName()
                +" ,queueName: "+binding.getQueueName()+" ,bindingKey:"+binding.getBindingKey());
    }
    //查找
    public Binding getBinding(String exchangeName,String queueName) throws MqException {
        ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
        if(bindingMap==null){
            return null;
        }
        Binding binding = bindingMap.get(queueName);
        return binding;
    }

//消息
    //插入
    public void insertMessage(Message message){
        messageMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 新增消息成功 messageID:"+message.getMessageId());
    }
    //删除
    public void deleteMessage(String messageId){
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息删除陈功 messageId: "+messageId);
    }
    //查找
    public Message getMessage(String messageId){
        return messageMap.get(messageId);
    }

    //队列消息集合
    //发送消息到指定队列
    public void sendMessage(MSGQueue queue,Message message){
        //1.先查找队列对应的集合是否存在,不存在时创建消息集合
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), f -> new LinkedList<>());
        //这里当多个线程同时执行插入操作时,可能会覆盖消息,要以集合为维度进行上锁
        synchronized(messages){
            messages.add(message);
        }
        //将消息也存入到消息集合中
        messageMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 发送消息到队列成功 queueName:"+queue.getName()+
                " ,messageId:"+message.getMessageId());
    }
    //从队列中取消息
    public Message pollMessage(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages==null || messages.isEmpty()){
            return null;
        }
        Message message = messages.remove(0);
        System.out.println("[MemoryDataCenter] 从队列中取消息成功 queueName:"+queueName+
                " ,messageId:"+message.getMessageId());
        return message;
    }
    //获取队列中的消息个数
    public int getMessageCountFromQueue(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages==null) return 0;
        //此处获取集合中元素个数可能存在线程安全问题,对集合进行上锁
        synchronized(messages){
            return messages.size();
        }
    }

//待确认消息集合
    //发送消息到待确认消息集合
    public void sendWaitMessage(String queueName,Message message){
        ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.computeIfAbsent(queueName, f -> new ConcurrentHashMap<>());
        //此处向待确认消息集合中插入数据时,也可能存在线程安全问题,以集合为维度加锁
        synchronized (waitMessagesMap){
            waitMessagesMap.put(message.getMessageId(),message);
        }
        System.out.println("[MemoryDataCenter] 发送待确认消息到队列成功 queueName:"+queueName+
                " ,messageId:"+message.getMessageId());
    }
    //从队列中取待确认消息
    public Message pollWaitMessage(String queueName,String messageId){
        ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);
        if(waitMessagesMap==null){
            return null;
        }
        Message message = waitMessagesMap.get(messageId);
        if(message==null){
            return null;
        }
        System.out.println("[MemoryDataCenter] 从队列中取代确认消息成功 messageId:"+messageId+
                " ,queueName:"+queueName);
        return message;
    }
    //从队列中删除待确认消息
    public void deleteWaitMessage(String queueName,String messageId){
        ConcurrentHashMap<String, Message> waitMessagesMap = queueMessageWaitAckMap.get(queueName);
        if(waitMessagesMap==null){
            System.out.println("[MemoryDataCenter] 待确认消息队列不存在,消息删除失败 messageId:"+messageId+
                    " ,queueName:"+queueName);
        }
        waitMessagesMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 待确认消息删除成功 messageId:"+messageId+
                " ,queueName:"+queueName);
    }

//恢复所有硬盘中的数据
    //当服务器重启后,内存中的数据都不存在了,要从磁盘中获取数据
    public void recovery(DiskDataManager diskDataManager) throws IOException, MqException, ClassNotFoundException {
        //先将内存中的集合都清空,防止存在残留数据
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        queueMessageWaitAckMap.clear();
        //恢复交换机数据
        List<Exchange> exchanges = diskDataManager.selectAllExchanges();
        for(Exchange e:exchanges){
            String exchangeName = e.getName();
            exchangeMap.put(exchangeName,e);
        }
        //恢复队列数据
        List<MSGQueue> queues = diskDataManager.selectAllQueues();
        for(MSGQueue q:queues){
            String queueName = q.getName();
            queueMap.put(queueName,q);
        }
        //恢复绑定关系
        List<Binding> bindings = diskDataManager.selectAllBindings();
        for(Binding b:bindings){
            ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(b.getExchangeName(), f -> new ConcurrentHashMap<>());
            bindingMap.put(b.getQueueName(),b);
        }

        //恢复消息
        for(MSGQueue q:queueMap.values()){
            List<Message> messages = diskDataManager.loadAllMessageFromQueue(q.getName());
            for(Message m:messages){
                messageMap.put(m.getMessageId(),m);
            }
        }
        //对于未确认消息,当服务器重启后,服务器中所有的消息都要重新发送,未被确认的消息就都成了未被取走的消息了,
        //对于未确认的消息, 就不需要回复这些数据了
    }

对MemoryDataCenter类功能测试:

@SpringBootTest
public class MemoryDataCenter {
    private MemoryDataCenter memoryDataCenter;
    @BeforeEach
    public void setUp(){
        memoryDataCenter = new MemoryDataCenter();
        System.out.println("前置工作已经准备后!");
    }

    @AfterEach
    public void tearDown(){
        memoryDataCenter = null;
        System.out.println("收尾工作以完成!");
    }
}

测试功能:

//测试交换机相关操作
    private Exchange createExchange(String exchangeName){
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        return exchange;
    }
    @Test
    void testExchange(){
        Exchange exchange = createExchange("exchangeTest");
        memoryDataCenter.insertExchange(exchange);

        Exchange act = memoryDataCenter.getExchange(exchange.getName());
        Assertions.assertEquals(exchange.getName(),act.getName());
        Assertions.assertEquals(exchange.getType(),act.getType());
        Assertions.assertEquals(exchange.isDurable(),act.isDurable());
        Assertions.assertEquals(exchange.isAutoDelete(),act.isAutoDelete());

        memoryDataCenter.deleteExchange(exchange.getName());
        act = memoryDataCenter.getExchange(exchange.getName());
        Assertions.assertNull(act);
    }

//测试队列相关操作
    private MSGQueue createQueue(String queueName){
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setAutoDelete(false);
        return queue;
    }
    @Test
    void testQueue(){
        MSGQueue queue = createQueue("queueTest");
        memoryDataCenter.insertQueue(queue);
        MSGQueue act = memoryDataCenter.getQueue(queue.getName());

        Assertions.assertEquals(queue.getName(),act.getName());
        Assertions.assertEquals(queue.isDurable(),act.isDurable());
        Assertions.assertEquals(queue.isAutoDelete(),act.isAutoDelete());

        memoryDataCenter.deleteQueue(queue.getName());
        act  = memoryDataCenter.getQueue(queue.getName());
        Assertions.assertNull(act);
    }
 //测试绑定关系相关操作
    private Binding createBinding(String exchangeName,String queueName,String bindingKey){
        Binding binding = new Binding();
        binding.setExchangeName(exchangeName);
        binding.setQueueName(queueName);
        binding.setBindingKey(bindingKey);
        return binding;
    }
    @Test
    void testBinding() throws MqException {
        //要先创建队列和交换机
        Exchange exchange = createExchange("exchangeTest");
        MSGQueue queue = createQueue("queueTest");
        Binding binding = createBinding(exchange.getName(), queue.getName(), "bindingKeyTest");

        memoryDataCenter.insertBinding(binding);
        Binding act = memoryDataCenter.getBinding(exchange.getName(), queue.getName());
        Assertions.assertEquals(binding.getExchangeName(),act.getExchangeName());
        Assertions.assertEquals(binding.getQueueName(),act.getQueueName());
        Assertions.assertEquals(binding.getBindingKey(),act.getBindingKey());

        memoryDataCenter.deleteBinding(binding);
        act = memoryDataCenter.getBinding(act.getExchangeName(),act.getQueueName());
        Assertions.assertNull(act);
    }

//测试消息操作
    private Message createMessage(String body){
        Message message = new Message();
        return message.createMessageById("routingKeyTest",null,body.getBytes());
    }
    @Test
    public void testMessage(){
        Message expectedMessage = createMessage("testMessage");
        memoryDataCenter.insertMessage(expectedMessage);

        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage,actualMessage);

        //删除消息
        memoryDataCenter.deleteMessage(expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

//测试队列中的消息集合
    @Test
    void testQueueMessage(){
        MSGQueue queue = createQueue("queueTest");
        Message message = createMessage("hello");
        memoryDataCenter.sendMessage(queue,message);
        int n = memoryDataCenter.getMessageCountFromQueue(queue.getName());
        Assertions.assertEquals(1,n);

        Message act = memoryDataCenter.pollMessage(queue.getName());
        n = memoryDataCenter.getMessageCountFromQueue(queue.getName());
        Assertions.assertEquals(0,n);
        Assertions.assertEquals(message.getMessageId(),act.getMessageId());
        Assertions.assertArrayEquals(message.getBody(),act.getBody());
        Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());
        Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());
    }

//测试待确认队列集合
    @Test
    void testWaitMessageQueue(){
        MSGQueue queue = createQueue("queueTest");
        Message message = createMessage("hello");
        memoryDataCenter.sendWaitMessage(queue.getName(), message);

        Message act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());
        Assertions.assertEquals(message.getMessageId(),act.getMessageId());
        Assertions.assertArrayEquals(message.getBody(),act.getBody());
        Assertions.assertEquals(message.getIsVaild(),act.getIsVaild());
        Assertions.assertEquals(message.getDeliveryMode(),act.getDeliveryMode());

        memoryDataCenter.deleteWaitMessage(queue.getName(), message.getMessageId());
        act = memoryDataCenter.pollWaitMessage(queue.getName(), message.getMessageId());
        Assertions.assertNull(act);
    }

//测试加载磁盘所有数据到内存
    @Test
    void testRecovery() throws IOException, MqException, ClassNotFoundException {
        //这里需要使用到mybatis,需要进行了类加载,先启动SpringApplication
        Mq02Application.context = SpringApplication.run(Mq02Application.class);
        //在磁盘上构造好数据:
        DiskDataManager diskDataCenter = new DiskDataManager();
        diskDataCenter.init("");
        //创建交换机:
        Exchange exchange = createExchange("testExchange");
        diskDataCenter.insertExchange(exchange);
        //创建队列:
        MSGQueue queue = createQueue("testQueue");
        diskDataCenter.insertQueue(queue);
        //创建绑定
        Binding binding = new Binding();
        binding.setExchangeName(exchange.getName());
        binding.setQueueName(queue.getName());
        binding.setBindingKey("bindingKey");
        diskDataCenter.insertBinding(binding);
        //创建消息
        Message message = createMessage("testContext");
        diskDataCenter.sendMessage(queue,message);

        //执行恢复:
        memoryDataCenter.recovery(diskDataCenter);
        //结果比对:
        //交换机:
        Exchange actualExchange = memoryDataCenter.getExchange(exchange.getName());
        Assertions.assertEquals(exchange.getName(),actualExchange.getName());
        Assertions.assertEquals(exchange.getType(),actualExchange.getType());
        Assertions.assertEquals(exchange.isDurable(),actualExchange.isDurable());
        Assertions.assertEquals(exchange.isAutoDelete(),actualExchange.isAutoDelete());
        //队列:
        MSGQueue actualQueue = memoryDataCenter.getQueue(queue.getName());
        Assertions.assertEquals(queue.getName(),actualQueue.getName());
        Assertions.assertEquals(queue.isDurable(),actualQueue.isDurable());
        Assertions.assertEquals(queue.isAutoDelete(),actualQueue.isAutoDelete());
        //绑定:
        Binding actulaBinding = memoryDataCenter.getBinding(exchange.getName(), queue.getName());
        Assertions.assertEquals(binding.getExchangeName(),actulaBinding.getExchangeName());
        Assertions.assertEquals(binding.getQueueName(),actulaBinding.getQueueName());
        Assertions.assertEquals(binding.getBindingKey(),actulaBinding.getBindingKey());
        //消息:
        Message actualMessage = memoryDataCenter.getMessage(message.getMessageId());
        Assertions.assertEquals(message.getMessageId(),actualMessage.getMessageId());
        Assertions.assertEquals(message.getDeliveryMode(),actualMessage.getDeliveryMode());
        Assertions.assertEquals(message.getRoutingKey(),actualMessage.getRoutingKey());
        Assertions.assertArrayEquals(message.getBody(),actualMessage.getBody());
        //   清除文件
        //清理之前要先关闭文件
        Mq02Application.context.close();
        File file = new File("./data");
        FileUtils.deleteDirectory(file);
    }

十二.整合内存和磁盘数据

将内存和磁盘上的数据进行整合,用"虚拟机"这个概念将其整合起来. 不同虚拟机中的交换机 队列,绑定关系,消息都是不互通的. 此处为了简化,仅实现单台虚拟主机,但在数据结构上设置不同虚拟主句名
 为区分不同的虚拟主机上的设备,通过配置设备名区别:(以虚拟机名为前缀)
 规定:
 *  exchangeName = virtualHostName+exchangeName;
 *  queueName = virtualHostName+queueName;
并且将调用的方法抛出的异常都在这个类中进行处理,不再向上抛出

创建VirtualHost类:

@Data
public class VirtualHost {
    private String virtualHostName;
    private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
    private DiskDataManager diskDataManager = new DiskDataManager();

    public VirtualHost(String virtualHostName){
        this.virtualHostName = virtualHostName;
        //初始化磁盘数据:
        diskDataManager.init();
        //初始化内存数据
        try {
            memoryDataCenter.recovery(diskDataManager);
        } catch (IOException | MqException | ClassNotFoundException e) {
            System.out.println("[VirtualHost] 内存数据恢复失败");
            e.printStackTrace();
        }
    }
}

Exchange的声明和删除:

 
    //在对交换机在内存和磁盘上插入和删除数据时,可能存在线程安全问题,要以交换机为维度对其上锁
    //交换机锁对象:
    private final Object exchangeLocker = new Object();
    //交换机操作:
    //创建交换机,
    //创建后,将其保存到内存和磁盘上
    public boolean exchangeDeclare(String exchangeName, ExchangeType type, boolean durable,
                                   boolean autoDelete, Map<String,Object> args){
       //   先根据约定 设置交换机名
        exchangeName = virtualHostName + exchangeName;
        synchronized(exchangeLocker){
            //先在内存上查找,若已存在,则直接返回
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if(exchange!=null){
                System.out.println("[VirtualHost] 交换机已经存在,不再创建 exchangeName:"+exchangeName);
                return true;
            }
            exchange = new Exchange();
            exchange.setName(exchangeName);
            exchange.setType(type);
            exchange.setDurable(durable);
            exchange.setAutoDelete(autoDelete);
            //这里对args参数的设置.要在Exchange类中再为args关于Map参数添加set和get方法
            exchange.setArgs(args);
            //先存入数据库,再存入内存中,
            //这个顺序是:插入数据库操作比较容易出现异常,存内存出现异常的可能小较小
            //         若插入数据库失败,则不再存入内存中;
            //         若是转换顺序,当存数据库出现异常时,还要将内存中的数据再删了,比较麻烦
            if(durable){
                //当交换机设置为持久化时,将其存入内存:
                diskDataManager.insertExchange(exchange);
            }
            //存入内存
            memoryDataCenter.insertExchange(exchange);
            System.out.println("[VirtualHost] 交换机创建成功 exchangeName:"+exchangeName);
            return true;
        }
    }
    //删除交换机
    //在内存和磁盘上将数据删除
    public boolean exchangeDelete(String exchangeName){
        exchangeName = virtualHostName + exchangeName;
        try {
            synchronized (exchangeLocker){
                Exchange exchange = memoryDataCenter.getExchange(exchangeName);
                if(exchange==null){
                    throw new MqException("[VirtualHost] 要删除的交换机不存在 exchangeName:"+exchangeName);
                }
                //删除内存数据:
                memoryDataCenter.deleteExchange(exchangeName);
                //删除磁盘数据:
                boolean durable = exchange.isDurable();
                if(durable){
                    diskDataManager.deleteExchange(exchangeName);
                }
                System.out.println("[VirtualHost] 交换机删除成功 exchangeName:"+exchangeName);
                return true;
            }
        } catch (MqException e) {
            System.out.println("[VirtualHost] 交换机删除失败 exchangeName:"+exchangeName);
            e.printStackTrace();
        }
        return false;
    }

在Exchange类中关于args属性上,再增加关于Map参数类型的set方法:

 public void setArgs(Map<String,Object> args){
        this.args = args;
    }

MSGQueue的声明和删除:


    //在对队列在内存和磁盘上插入和删除数据时,可能存在线程安全问题,要以队列为维度对其上锁
    //创建 队列锁对象:
    private final Object queueLocker = new Object();
    /**队列
     * 创建队列:创建队列并将其存入到磁盘和内存中
     */
    public boolean queueDeclare(String queueName,boolean isDurable,boolean autoDelete, Map<String,Object> args){
        queueName = virtualHostName+queueName;
        try {
            synchronized(queueLocker){
                MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
                if(existsQueue!=null){
                    System.out.println("[VirtualHost] 队列已经存在 queueName:"+queueName);
                    return true;
                }
                MSGQueue queue = new MSGQueue();
                queue.setName(queueName);
                queue.setDurable(isDurable);
                queue.setAutoDelete(autoDelete);
                //此处在MSGQueue类中,针对args属性,要实现关于Map类型的set方法
                queue.setArgs(args);
                //存入磁盘
                if(isDurable) {
                    diskDataManager.insertQueue(queue);
                }
                //存入内存
                memoryDataCenter.insertQueue(queue);
                System.out.println("[VirtualHost] 创建队列成功 !");
            }
            return true;
        } catch (IOException | MqException e) {
            System.out.println("[VirtualHost] 创建队列失败 queueName:"+queueName);
            e.printStackTrace();
            return false;
        }
    }
    //删除队列:从磁盘和内存中 删除队列
    public boolean queueDelete(String queueName){
        queueName = virtualHostName+queueName;
        try{
            synchronized(queueLocker){
                MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);
                if(existsQueue==null){
                    throw new MqException("[VirtualHost] 队列不存在,删除队列失败 queueName:"+queueName);
                }
                if(existsQueue.isDurable()){
                    diskDataManager.deleteQueue(queueName);
                }
                memoryDataCenter.deleteQueue(queueName);
                System.out.println("[VirtualHost] 删除队列成功 queueName:"+queueName);
            }
            return true;
        }catch (Exception e) {
            System.out.println("[VirtualHost] 队列删除失败! queueName:" +queueName);
            e.printStackTrace();
            return false;
        }
    }

同样,在MSGQueue类中关于args属性上,再增加关于Map参数类型的set方法:

 public void setArgs(Map<String,Object> args){
        this.args = args;
    }

Binding的创建和删除:


//  该类实现和绑定相关的操作
    private Router router = new Router();
    //绑定的插入和删除
    //插入绑定
    public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey){
        exchangeName = virtualHostName + exchangeName;
        queueName = virtualHostName + queueName;
        try{
            //1.验证绑定是否存在,不存在再创建
            Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
            if(binding!=null){
                throw new MqException("[VirtualHost] 绑定已存在 exchangeName:"+exchangeName+
                        " ,queueName:"+queueName);
            }
            //这里再创建一个类router,实现关于绑定相关的操作
            //2.判断bindingKey格式是否正确
            boolean ok = router.checkBindingKey(bindingKey);
            if(!ok){
                throw new MqException("[VirtualHost] 绑定格式有误 bindingKey:"+bindingKey);
            }
            //3.创建绑
            binding = new Binding();
            binding.setExchangeName(exchangeName);
            binding.setQueueName(queueName);
            binding.setBindingKey(bindingKey);
            //4.验证绑定的队列和交换机是否存在
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if(queue==null){
                throw new MqException("[VirtualHost] 要绑定的队列不存在 queueName:"+queueName+
                        " ,bindingKey:"+bindingKey);
            }
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if(exchange==null){
                throw new MqException("[VirtualHost] 要绑定的交换机不存在 exchangeName:"+exchangeName+
                        " ,bindingKey:"+bindingKey);
            }
            //5,存入磁盘
            //当队列和交换机同时设置持久化时,将该绑定关系存入磁盘
            if(queue.isDurable() && exchange.isDurable()){
                diskDataManager.insertBinding(binding);
            }
            //6.存入内存
            memoryDataCenter.insertBinding(binding);
            System.out.println("[VirtualHost] 创建绑定成功 bindingKey: "+bindingKey);
            return true;
        }catch (MqException e) {
            System.out.println("[VirtualHost] 创建绑定失败 bindingKey:"+bindingKey);
            e.printStackTrace();
        }
        return false;
    }

    //删除绑定
    public boolean bindingDelete(String exchangeName,String queueName){
        exchangeName = virtualHostName + exchangeName;
        queueName = virtualHostName + queueName;
        try {
            Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);
            if(binding==null){
                throw new MqException("[VirtualHost] 绑定不存在 queueName:"+queueName+
                        " ,exchangeName:"+exchangeName);
            }
            //从内存删除
            memoryDataCenter.deleteBinding(binding);
            //从磁盘删除
            //此处可能绑定没有保存在磁盘上,删除失败,但没有关系,没有影响
            diskDataManager.deleteBinding(binding.getBindingKey());
            System.out.println("[VirtualHost] 删除绑定成功 exchangeName:"+exchangeName+
                    " ,queueName:"+queueName +" , bindingKey:"+binding.getBindingKey());
            return true;
        }catch(Exception e){
            System.out.println("[VirtualHost] 删除绑定失败 exchangeName:"+exchangeName+
                    " ,queueName:"+queueName);
            e.printStackTrace();
            return false;
        }
    }

创建Router类:

实现匹配判断功能:

/**
 * 该类实现和绑定相关的操作
 //路由规定:
 //routingKey: 只能由 数字 字母(大小写) 下划线 构成,使用.作为分割
 //bindingKey:只能包含 数字 字母 下划线 * #,以 . 作为分割,* #只能作为独立的分段
 */
public class Router {
    //判断消息携带的绑定格式是否正确
    public boolean checkRoutingKey(String routingKey){
        char[] ch = routingKey.toCharArray();
        for(char i:ch){
            if(i>='a' && i<='z') continue;
            if(i>='A' && i<='Z') continue;
            if(i>='0' && i<='9') continue;
            if(i=='.' || i=='_') continue;
            else return false;
        }
        return true;
    }

    //判断绑定格式是否正确
    public boolean checkBindingKey(String bindingKey){
        char[] ch = bindingKey.toCharArray();
        for(char c:ch){
            if(c>='A' && c<='Z') continue;
            if(c>='a' && c<='z') continue;
            if(c>='0' && c<='9') continue;
            if(c=='_' || c=='*' || c=='#' || c=='.') continue;
            else return false;
        }
        //规定不能让* #相连,即出现以下情况规定不成立:
        // *.#   #.*  #.#
        //以 . 对字符串进行分隔,判断
        String[] s = bindingKey.split("\\.");
        for(int i=0;i<s.length-1;i++){
            if (s[i].equals("*") && s[i+1].equals("#") ||
                s[i].equals("#") && s[i+1].equals("*") ||
                s[i].equals("#") && s[i+1].equals("#")) {
                return false;
            }
        }
        return true;
    }
    //判断bindingKey与routingKey是否匹配成功
    public boolean isRouting(ExchangeType type,String routingKey,String bindingKey) throws MqException {
        //判断当前交换机类型:fanout/topic
        if(type==ExchangeType.FANOUT){
            //匹配到绑定交换机的所有队列
            //直接返回即可
            return true;
        }else if(type==ExchangeType.TOPIC){
            //进行routingKey和BindingKey的匹配判断
            return routingTopic(routingKey,bindingKey);
        }else{
            throw new MqException("[Router] 交换机类型有误 type:"+type);
        }
    }

    /**
     * 规定:rotingKey匹配bindingKey
     *  *:匹配任意单个字符串
     *  #:匹配任意个任意字符串
     * @param routingKey 消息携带的匹配字符串
     * @param bindingKey 交换机和队列的绑定关系
     * @return
     */
    private boolean routingTopic(String routingKey, String bindingKey) {
        String[] b = bindingKey.split("\\.");
        String[] r = routingKey.split("\\.");
        int n1 = b.length;
        int n2 = r.length;
        int i = 0;
        int j = 0;
        while(i<n1 && j<n2){
            if(b[i].equals("*")){
                //可以匹配routingKey的任意单个字符
                //直接向后走就行:
                i++;
                j++;
            }else if(b[i].equals("#")){
                //匹配routingKey的任意个任意字符
                if(i==n1-1){
                    //当bindingKey的最后一个字符为#时,可以匹配routingKey后面的所有字符串,直接返回true即可:
                    return true;
                }else{
                    //当b的#不是最后一个字符时,就找r之后的字符串中是否有b的下一个字符串的下标,当找不到时,就返回-1:
                    i++;
                    j = checkNext(b[i],r,j);
                    //当在r中找不到b的下一个字符串时,一定匹配失败,直接返回
                    if(j==-1) return false;
                    else{
                        i++;
                        j++;
                    }
                }
            }else{
                //b为普通字符串时
                if(!b[i].equals(r[j])) return false;
                else {
                    i++;
                    j++;
                }
            }
        }
        //b / r有一个已经匹配到结尾了,只有两个都完全匹配完,才算匹配成功
        if(i!=n1 || j!=n2){
            return false;
        }
        return true;
    }
    private int checkNext(String next, String[] r, int j) {
        for(int k=j;k<r.length;k++){
            if(r[k].equals(next)) return k;
        }
        return -1;
    }

}

对Router类的TOPIC匹配进行测试:

@SpringBootTest
public class RouterTopicTest {
    private Router router = new Router();
    // [测试用例]
    // binding key          routing key         result
    // aaa                  aaa                 true
    // aaa.bbb              aaa.bbb             true
    // aaa.bbb              aaa.bbb.ccc         false
    // aaa.bbb              aaa.ccc             false
    // aaa.bbb.ccc          aaa.bbb.ccc         true
    // aaa.*                aaa.bbb             true
    // aaa.*.bbb            aaa.bbb.ccc         false
    // *.aaa.bbb            aaa.bbb             false
    // #                    aaa.bbb.ccc         true
    // aaa.#                aaa.bbb             true
    // aaa.#                aaa.bbb.ccc         true
    // aaa.#.ccc            aaa.ccc             true
    // aaa.#.ccc            aaa.bbb.ccc         true
    // aaa.#.ccc            aaa.aaa.bbb.ccc     true
    // #.ccc                ccc                 true
    // #.ccc                aaa.bbb.ccc         true

    @Test
    void test01() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC, "aaa", "aaa");
        Assertions.assertTrue(ok);
    }

    @Test
    void test02() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bbb", "aaa.bbb");
        Assertions.assertTrue(ok);
    }

    @Test
    void test03() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bbb.ccc", "aaa.bbb");
        Assertions.assertFalse(ok);
    }

    @Test
    void test04() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.ccc", "aaa.bbb");
        Assertions.assertFalse(ok);
    }

    @Test
    void test05() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb.cc", "aaa.bb.cc");
        Assertions.assertTrue(ok);
    }
    @Test
    void test06() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb", "aaa.*");
        Assertions.assertTrue(ok);
    }

    @Test
    void test07() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb.cc", "aaa.*.bb");
        Assertions.assertFalse(ok);
    }

    @Test
    void test08() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb", "*.aaa.bb");
        Assertions.assertFalse(ok);
    }
    @Test
    void test09() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb.cc", "#");
        Assertions.assertTrue(ok);
    }
    @Test
    void test10() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb", "aaa.#");
        Assertions.assertTrue(ok);
    }
    @Test
    void test11() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb.cc", "aaa.#");
        Assertions.assertTrue(ok);
    }

    @Test
    void test12() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.cc", "aaa.#.cc");
        Assertions.assertTrue(ok);
    }

    @Test
    void test13() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb.cc", "aaa.#.cc");
        Assertions.assertTrue(ok);
    }
    @Test
    void test14() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.aaa.bb.cc", "aaa.#.cc");
        Assertions.assertTrue(ok);
    }
    @Test
    void test15() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "cc", "#.cc");
        Assertions.assertTrue(ok);
    }    @Test
    void test16() throws MqException {
        boolean ok = router.isRouting(ExchangeType.TOPIC,
                "aaa.bb.cc", "#.cc");
        Assertions.assertTrue(ok);
    }

发送消息:

//发送消息到队列
    public boolean basicPublish(String exchangeName,String routingKey,
                                BasicProperties basicProperties,byte[] body){
        exchangeName = virtualHostName + exchangeName;
        try {
            //1.判断交换机是否存在
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if(exchange==null){
                throw new MqException("[VirtualHost] 交换机不存在 exchangeName:"+exchangeName);
            }
            //2.判断routingKey格式是否正确
            boolean ok = router.checkRoutingKey(routingKey);
            if(!ok) {
                throw new MqException("[VirtualHost] routingKey格式有误 routingKey:"+routingKey);
            }
            //3.根据交换机的类型进行路由匹配,分发消息
            if(exchange.getType()==ExchangeType.DIRECT){
                //直接交换机,routingKey就是队列名,bindingKey无用,将消息路由到指定的队列上
                //获取到指定队列
                String queueName = virtualHostName + routingKey;
                MSGQueue queue = memoryDataCenter.getQueue(queueName);
                if(queue==null){
                    throw new MqException("[VirtualHost] 队列不存在 queueName:"+queueName);
                }
                //构造消息对象
                Message message = new Message();
                message = message.createMessageById(null,basicProperties,body);
                //发送消息到队列,再构造一个方法实现
                sendMessage(queue,message);
            }else{
                //当交换机类型为fanout/topic时:
                //遍历交换机所有的绑定
                ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
                for(Binding b:bindings.values()){
                    MSGQueue queue = memoryDataCenter.getQueue(b.getQueueName());
                    //判断交换机绑定的队列是否存在:
                    if(queue==null){
                        System.out.println("[VirtualHost] 队列不存在 queueName:"+b.getQueueName());
                        continue;
                    }
                    //构造消息对象
                    Message message = new Message().createMessageById(routingKey, basicProperties, body);
                    //判断routingKey与binding是否成功
                    if(!router.isRouting(exchange.getType(),message.getRoutingKey(),b.getBindingKey())){
                       //匹配失败:
                        System.out.println("[VirtualHost] routingKey和BindingKey不匹配 routingKey:"+routingKey+
                                " , bindingKey:"+b.getBindingKey());
                        continue;
                    }
                    //匹配成功时,就将消息转发
                    sendMessage(queue,message);
                    System.out.println("[VirtualHost] 消息发送成功 queueName:"+queue.getName()+
                            " ,messageId:"+message.getMessageId());
                }
            }
            return true;
        }catch (Exception e){
            System.out.println("[VirtualHost]消息发送失败 ");
            e.printStackTrace();
            return false;
        }
    }
    //消费者管理对象:
    private ConsumerManager consumerManager = new ConsumerManager(this);

    private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {
            //存入磁盘
            //是否持久化
            //1:持久化 0:非持久化
            if(message.getDeliveryMode()==1){
                diskDataManager.sendMessage(queue,message);
            }
            //存入内存:
            memoryDataCenter.sendMessage(queue,message);
            //消息已经到达队列,通知订阅队列的消费者消费消息
            consumerManager.notifyConsumer(queue.getName());
            System.out.println("[VirtualHost] 发送消息成功");
    }

创建ConsumerManager类:

对消费者进行管理:


/**
 * 消费者管理类
 */
public class ConsumerManager {
    //持有上层的VirtualHost对象的引用,用来操作数据
    private VirtualHost virtualHost;
    //    使⽤⼀个线程池⽤来执⾏消息回调
    private ExecutorService workerPool = Executors.newFixedThreadPool(4);
    //存放令牌(队列名)的队列:那个队列当前有消息了,就将队列名加入到阻塞队列中
    //然后扫描线程通过该队列中存放的队列名找到对应的消息和订阅者,将信息打包放到线程池中进行消费
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
    //扫描线程
    private Thread scannerThread = null;

    //通知消费者消费消息:
    //调用时机:发送方发送消息成功后,
    //当队列中有消息了,就将其放到阻塞队列中,然后就要通知消费者消费消息了
    public void notifyConsumer(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }
}

订阅消息:

    /**
    //订阅消息
    //添加一个订阅者:
     * @param consumerTag 消费者身份标识
     * @param queueName 队列名
     * @param autoAck 是否自动确认消息
     * @param consumer 回调函数
     * @return
     */
    public boolean basicConsume(String consumerTag, String queueName,
                                 boolean autoAck, Consumer consumer){
        queueName = virtualHostName + queueName;
        try {
            //通过消费者管理类实现添加消费者功能
            consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
            System.out.println("[VirtualHost] basicConsumer 成功 queueName:"+queueName);
            return true;
        }  catch (MqException e) {
            System.out.println("[VirtualHost] basicConsumer 失败 queueName:"+queueName);
            e.printStackTrace();
            return false;
        }
    }

创建ConsumerEnv类:

消费者完整环境类:


/**
 * 表示消费者(完整的执行环境)
 */
@Data
public class ConsumerEnv {
    //消费者唯一标识
    private String consumerTag;
    //订阅队列的队列名字
    private String queueName;
    //是否自动确认消息
    private boolean autoAck;
    //要执行的具体功能,通过一个接口,由调用者自己实现其方法体
    private Consumer consumer;
}

创建Consumer接口:

实现消费者的回调函数接口:通过lambda表达式,让消费者自己实现对消息的处理

/**
 * 函数式接口,回调函数,当消费者收到消息后,要处理消息,调用者通过这个接口实现具体的功能
 */
@FunctionalInterface
public interface Consumer {
    //deliver:投递的意思,这个方法在每次服务器收到发送来的消息后,调用
    //通过这个方法把消息推送给对应的消费者
    void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
}

为MSGQueue类新增加二个属性和方法,用于管理订阅队列的消费者集合:


    //此处再添加一个属性:订阅该队列的消费者集合
    private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
    //当订阅队列的消费者不止一个时 , 规定以轮训的方式消费消息
    //再添加一个属性,记录当前轮到哪个消费者消费消息了
    //这里使用AtomicInteger类来实现,目的是不让手动修改,且要实现自增的功能
    private AtomicInteger atomicInteger = new AtomicInteger(0);

    //添加一个新的订阅者(消费者)
    public void addConsumerEnv(ConsumerEnv consumerEnv){
        consumerEnvList.add(consumerEnv);
    }

    //挑选一个订阅者,消费当前消息,按照轮训的方式
    public ConsumerEnv chooseConsumerEnv(){
        if(consumerEnvList.isEmpty()){
            //当前该队列还没有消费者订阅
            System.out.println("[MSGQueue] 当前该队列没有订阅者");
            return null;
        }
        //按照轮训的方式获取一个要消费消息的订阅者下标
        int index = atomicInteger.get()%consumerEnvList.size();
        //让轮训值 自增
        atomicInteger.getAndIncrement();
        return consumerEnvList.get(index);
    }

在ConsumerManager类中实现添加消费者方法:

 //添加新的消费者,并消费队列中当前存在的消息
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        //1.找到对应的队列
        MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
        if(queue==null){
            throw new MqException("[ConsumerManager] 队列不存在 queueName:"+queueName);
        }
        //2.创建一个消费者
        ConsumerEnv consumerEnv = new ConsumerEnv();
        consumerEnv.setConsumerTag(consumerTag);
        consumerEnv.setQueueName(queueName);
        consumerEnv.setAutoAck(autoAck);
        consumerEnv.setConsumer(consumer);
        //3.将订阅者加入到队列的订阅者队列中
        queue.addConsumerEnv(consumerEnv);
        //4.当队列中已经有一些消息时,要将其消费掉
        synchronized (queue){
            int n = virtualHost.getMemoryDataCenter().getMessageCountFromQueue(queueName);
            for(int i=0;i<n;i++){
                //这个方法调用一次就消费一条消息
                consumerMessage(queue);
            }
        }
    }
    //  消费消息:调用消息的回调函数,并将消息从队列中删除
    //从队列中获取一个消息,并让消费者消费,
    // 当消费者不止一个时,按照轮训的方式让消费者依次消费消息
    private void consumerMessage(MSGQueue queue) throws MqException {
        //1.从队列的订阅者中挑选一个订阅者
        ConsumerEnv consumerEnv = queue.chooseConsumerEnv();
        if(consumerEnv==null){
            //当前队列号没有订阅者,无法消费消息
            System.out.println("[ConsumerManager] 当前队列中还没有订阅者");
            return;
        }
        //2.消费消息
        Message message = virtualHost.getMemoryDataCenter().pollMessage(queue.getName());
        if(message==null){
            //当前队列中还没有消息,不需要消费
            System.out.println("当前队列中还没有消息");
            return;
        }
        //将消息带到消费者的回调方法中,给线程池执行
        workerPool.submit(()->{
            try{
                //1.在执行回调之前,先将消息放到待确认队列集合中,一旦消息被消费失败了.就重新发送消息
                virtualHost.getMemoryDataCenter().sendWaitMessage(queue.getName(),message);
                //2.执行订阅者的回调方法
                consumerEnv.getConsumer().handlerDeliver(consumerEnv.getConsumerTag(),message.getBasicProperties(),message.getBody());
                //3.根据消费者的确认消息方式及消费者消费消息的情况,执行删除消息操作
                //  这里完成为自动确认模式下的操作,手动模式下,在basicAck方法中实现
                if(consumerEnv.isAutoAck()){
                    //4.删除磁盘中的数据
                    //  是否持久化
                    //  1:非持久化 0:持久化
                    if(message.getDeliveryMode()==0) {
                        virtualHost.getDiskDataManager().deleteMessageFromQueue(queue, message);
                    }
                    //5.删除未确认消息队列中的消息
                    virtualHost.getMemoryDataCenter().deleteWaitMessage(queue.getName(), message.getMessageId());
                    //6.删除消息集合中的消息
                    virtualHost.getMemoryDataCenter().deleteMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费 ");
                }
            }catch (Exception e){
                System.out.println("[ConsumerManager] 消费消息失败");
                e.printStackTrace();
            }
        });
    }

在ConsumerManager类中,添加扫描线程,不停扫描阻塞令牌队列,查看是否有新的消息到来,需要消费者及时消费:

//先获取到令牌,根据令牌找到指定的队列,从队列中获取消息进行消费
    public ConsumerManager(VirtualHost parent){
        virtualHost = parent;
        //为推的模式.不断的扫描令牌队列,一但有消息进入队列,就将其推送给消费者
        Thread t = new Thread(()->
        {
            while(true){
                try {
                    //1.获取令牌
                    String queueName = tokenQueue.take();
                    //2.根据令牌,找到指定的队列
                    MSGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
                    if(queue==null){
                        throw new MqException("[ConsumerManager] 获取令牌时,发现队列不存在");
                    }
                    synchronized (queue){
                        //3.从队列中获取一个消息并进行消费
                        consumerMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        //将线程设为后台线程
        //当前台线程执行结束了,后台线程也就结束了,
        //若设为前台线程,那么只有当前台线程执行完了,整个进程才会结束,
        // 这里的循环是while(true)会一直卡着执行结束不了,因此要设成后台线程
        t.setDaemon(true);
        //启动线程
        t.start();
    }

十三.网络通信协议设计

生产者和消费者都是客户端,需要通过网络和消息队列服务器进行通信.

此处我们使⽤TCP协议,来作为通信的底层协议.同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服 务器这边功能的远程调⽤.

设计应用层协议:

使⽤⼆进制的⽅式设定协议.

请求数据格式:

响应数据格式:

其中 type 表⽰请求响应不同的功能. 取值如下:

• 0x1 创建 channel

• 0x2 关闭 channel

• 0x3 创建 exchange

• 0x4 销毁 exchange

• 0x5 创建 queue

• 0x6 销毁 queue

• 0x7 创建 binding

• 0x8 销毁 binding

• 0x9 发送 message

• 0xa 订阅 message

• 0xb 返回 ack

• 0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的

对于请求来说,payload是各种请求方法的参数信息

对响应来说,payload是方法的返回数据信息.

创建request类:


/**
 * 表示一个网络通信中的请求对象
 */
@Data
public class Request {
    /** type 表⽰请求响应不同的功能. 取值如下
     *  0x1  创建 channel
     * • 0x2  关闭 channel
     * • 0x3  创建 exchange
     * • 0x4  销毁 exchange
     * • 0x5  创建 queue
     * • 0x6  销毁 queue
     * • 0x7  创建 binding
     * • 0x8  销毁 binding
     * • 0x9  发送 message
     * • 0xa  订阅 message
     * • 0xb  返回 ack
     * • 0xc  服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
     */
    //请求类型,设定占4字节
    private int type;
    //请求的数据长度,占4字节
    private int length;
    //请求体 payload 表⽰这次⽅法调⽤的各种参数信息
    private byte[] payload;
}

创建response类:

/**
 * 表示一个响应对象
 */
@Data
public class Response {
    //按照自己的定义,响应类型,4字节
    private int type;
    //响应的数据长度,4字节
    private int length;
    //响应体
    private byte[] payload;
}

创建参数父类:

//定义参数⽗类
//构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
//不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再
//通过继承的⽅式体现
@Data
public class BasicArgs implements Serializable {
    //表示一次请求的身份标识,用来和该请求 对应的返回的响应相对照
    protected String rid;
    //每一次请求需要建立连接,通过TCP建立连接,一个连接可以发送多次消息,每条消息通过信道传送
    //一条信道可以发送多条消息
    //这次通信的信道channel的身份标识
    protected String channelId;
}

创建响应父类:

/**
 * 定义payload的返回数据
 */
@Data
public class BasicReturns implements Serializable {
    //一次请求或相应的身份标识
    protected String rid;
    //标识一个channel
    protected String channelId;
    //表示方法的执行结果  payload 表⽰这次⽅法调⽤的返回值.
    protected boolean ok;
}

创建设备功能的参数类:

exchangeDeclareArgs:

/**
 * 这个类表示调用声明交换机方法的参数
 */
@Data
public class ExchangeDeclareArgs extends BasicArgs implements Serializable {
    private String exchangeName;
    private ExchangeType type;
    private boolean isDurable;
    private boolean autoDelete;
    private Map<String,Object> args;
}

exchangeDeleteArgs:

@Data
public class ExchangeDeleteArgs extends BasicArgs implements Serializable {
    private String exchangeName;
}

queueDeclareArgs:

@Data
public class QueueDeclareArgs extends BasicArgs implements Serializable {
    private String queueName;
    private boolean isDurable;
    private boolean autoDelete;
    private Map<String,Object> args;
}

queueDeleteArgs:

@Data
public class QueueDeleteArgs extends BasicArgs implements Serializable {
    private String queueName;
}

bindingDeclareArgs:

@Data
public class BindingDeclareArgs extends BasicArgs implements Serializable {
    private String ExchangeName;
    private String queueName;
    private String bindingKey;
}

bindingDeleteArgs:

@Data
public class BindingDeleteArgs extends BasicArgs implements Serializable {
    private String exchangeName;
    private String queueName;
}

basicPublishArgs:

@Data
public class BasicPublishArgs extends BasicArgs implements Serializable {
    private String exchangeName;
    private String routingKey;
    private BasicProperties basicProperties;
    private byte[] body;
}

basicConsumerArgs:

@Data
public class BasicConsumerArgs extends BasicArgs implements Serializable {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
    //这个类对应的BasicConsumer方法还有一个参数 consumer,是一个回到参数
    //消费者客户端收到服务器发送的消息后,针对自己的业务,实现这个回调接口就行了,
    //无需再将回调参数传给服务器,因此解救不需要在这里写这个参数了
    //并且,这个 回调参数也无法通过网络传输给服务器
}

basicAckArgs:

/**
 * 手动响应数据
 */
@Data
public class BasicAckArgs extends BasicArgs implements Serializable {
    private String queueName;
    private String messageId;
}

subscribeReturns:

/**
 * 这里类表示返回数据的具体参数
 * 是服务器给消费者提供的订阅消息
 * consumerTag其实是channelId.
 * basicProperties和body共同构成了Message.
 */
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {
    private String consumerTag;
    private BasicProperties basicProperties;
    private byte[] body;
}

十四.实现BrokerServer

public class BrokerServer {
    //调用相关数据
    private VirtualHost virtualHost = new VirtualHost("default");
    //服务器⾃⾝的 socket
    private ServerSocket serverSocket = null;
    //引入线程池,处理多个客户端的请求
    private ExecutorService executorService = null;
    //引入一个哈希表,存储所有的会话对象
    //key: channelId, val:socket对象
    private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
    //引入一个布尔变量,表示当前服务器是否要停止,
    //要对所有线程是立即可见的,用volatile修饰
    private volatile boolean runnable = true;

    public BrokerServer(int port) throws IOException {
        serverSocket = new ServerSocket(9090);
    }


    //启动服务
    public void start() throws IOException {
        System.out.println("[BrokerServer] 启动服务");
        executorService = Executors.newCachedThreadPool();
        try {
            while (runnable) {
                //accept:不断接收客户端发来的请求:
                Socket clientSocket = serverSocket.accept();
                executorService.submit(() -> {
                    processConnection(clientSocket);
                });
            }
        } catch (SocketException e) {
            //正常结束
            System.out.println("[BrokerServer] 服务器停止运行!");
        }
    }

    //停止服务器
    public void stop() throws IOException {
        runnable = false;
        executorService.shutdown();
        serverSocket.close();
    }

    //处理一个客户端的连接
    //一个个连接可能有多次的请求和相应
    //要读取数据,处理数据,然后将结果返回给客户端
    private void processConnection(Socket clientSocket) {
        try (InputStream inputStream = clientSocket.getInputStream();
             OutputStream outputStream = clientSocket.getOutputStream()) {
            //  这里需要按照特定格式进行读取和解析数据
            try (DataInputStream dataInputStream = new DataInputStream(inputStream);
                 DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                while (true) {
                    //1.读取请求
                    Request request = readRequest(dataInputStream);
                    //2.根据请求计算相应
                    Response response = process(request, clientSocket);
                    //3.将结果返回给客户端
                    writeResponse(dataOutputStream,response);
                }
            } catch (EOFException | SocketException e) {
                //当出现这两种异常时,是正常的异常,是请求读取结束了,读到了空字符串抛出的异常,
                // 正常结束循环就可以了
                System.out.println("[BrokerServer] connection 连接关闭 ,客户端地址: " + clientSocket.getInetAddress().toString()
                        + " : " + clientSocket.getPort());
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            } catch (MqException e) {
                throw new RuntimeException(e);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            //关闭资源
            try {
                //当前连接处理完之后,需要关闭Socket
                clientSocket.close();
                //把当前socket对应的所有channel也删除了
                clearCloseSessions(clientSocket);
            } catch (IOException e) {
                e.printStackTrace();
            }
            //删除sessions中客户端和服务器建立的连接
        }
    }

    private Request readRequest(DataInputStream dataInputStream) throws IOException {
        Request request = new Request();
        request.setType(dataInputStream.readInt());
        request.setLength(dataInputStream.readInt());
        byte[] payload = new byte[request.getLength()];
        int n = dataInputStream.read(payload);
        if (n != request.getLength()) {
            throw new IOException("读取请求格式出错!");
        }
        request.setPayload(payload);
        return request;
    }

    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
        dataOutputStream.writeInt(response.getType());
        dataOutputStream.writeInt(response.getLength());
        dataOutputStream.write(response.getPayload());
        // 这个刷新缓冲区也是重要的操作!!
        dataOutputStream.flush();
    }

    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        // 1. 把 request 中的 payload 做一个初步的解析.
        BasicArgs BasicArgs = (BasicArgs) BinaryTool.fromBytes(request.getPayload());
        System.out.println("[Request] rid=" + BasicArgs.getRid() + ", channelId=" + BasicArgs.getChannelId()
                + ", type=" + request.getType() + ", length=" + request.getLength());
        // 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
        boolean ok = true;
        if (request.getType() == 0x1) {
            // 创建 channel
            sessions.put(BasicArgs.getChannelId(), clientSocket);
            System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + BasicArgs.getChannelId());
        } else if (request.getType() == 0x2) {
            // 销毁 channel
            sessions.remove(BasicArgs.getChannelId());
            System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + BasicArgs.getChannelId());
        } else if (request.getType() == 0x3) {
            // 创建交换机. 此时 payload 就是 ExchangeDeclareArgs 对象了.
            ExchangeDeclareArgs Args = (ExchangeDeclareArgs) BasicArgs;
            ok = virtualHost.exchangeDeclare(Args.getExchangeName(), Args.getType(),
                    Args.isDurable(), Args.isAutoDelete(), Args.getArgs());
        } else if (request.getType() == 0x4) {
            ExchangeDeleteArgs Args = (ExchangeDeleteArgs) BasicArgs;
            ok = virtualHost.exchangeDelete(Args.getExchangeName());
        } else if (request.getType() == 0x5) {
            QueueDeclareArgs Args = (QueueDeclareArgs) BasicArgs;
            ok = virtualHost.queueDeclare(Args.getQueueName(), Args.isDurable(), Args.isAutoDelete(), Args.getArgs());
        } else if (request.getType() == 0x6) {
            QueueDeleteArgs Args = (QueueDeleteArgs) BasicArgs;
            ok = virtualHost.queueDelete((Args.getQueueName()));
        } else if (request.getType() == 0x7) {
            BindingDeclareArgs Args = (BindingDeclareArgs) BasicArgs;
            ok = virtualHost.bindingDeclare(Args.getQueueName(), Args.getExchangeName(), Args.getBindingKey());
        } else if (request.getType() == 0x8) {
            BindingDeleteArgs Args = (BindingDeleteArgs) BasicArgs;
            ok = virtualHost.bindingDelete(Args.getQueueName(), Args.getExchangeName());
        } else if (request.getType() == 0x9) {
            BasicPublishArgs Args = (BasicPublishArgs) BasicArgs;
            ok = virtualHost.basicPublish(Args.getExchangeName(), Args.getRoutingKey(),
                    Args.getBasicProperties(), Args.getBody());
        } else if (request.getType() == 0xa) {
            BasicConsumerArgs Args = (BasicConsumerArgs) BasicArgs;
            ok = virtualHost.basicConsume(Args.getConsumerTag(), Args.getQueueName(), Args.isAutoAck(),
                    new Consumer() {
                        //这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端
                        //此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询,
                        //  就可以得到对应的socket 对象了, 从而可以往里面发送数据了
                        @Override
                        public void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                            // 先知道当前这个收到的消息, 要发给哪个客户端.
                            // 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
                            // socket 对象了, 从而可以往里面发送数据了
                            // 1. 根据 channelId 找到 socket 对象
                            Socket clientSocket = sessions.get(consumerTag);
                            if (clientSocket == null || clientSocket.isClosed()) {
                                throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
                            }
                            // 2. 构造响应数据
                            SubScribeReturns subScribeReturns = new SubScribeReturns();
                            subScribeReturns.setChannelId(consumerTag);
                            subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
                            subScribeReturns.setOk(true);
                            subScribeReturns.setConsumerTag(consumerTag);
                            subScribeReturns.setBasicProperties(basicProperties);
                            subScribeReturns.setBody(body);
                            byte[] payload = BinaryTool.toByte(subScribeReturns);
                            Response response = new Response();
                            // 0xc 表示服务器给消费者客户端推送的消息数据.
                            response.setType(0xc);
                            // response 的 payload 就是一个 SubScribeReturns
                            response.setLength(payload.length);
                            response.setPayload(payload);
                            // 3. 把数据写回给客户端.
                            //    注意! 此处的 dataOutputStream 这个对象不能 close !!!
                            //    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
                            //    此时就无法继续往 socket 中写入后续数据了.
                            DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                            writeResponse(dataOutputStream, response);
                        }
                    });
        } else if (request.getType() == 0xb) {
            // 调用 basicAck 确认消息.
            BasicAckArgs Args = (BasicAckArgs) BasicArgs;
            ok = virtualHost.basicAck(Args.getQueueName(), Args.getMessageId());
        } else {
            // 当前的 type 是非法的.
            throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
        }
        // 3. 构造响应
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setChannelId(BasicArgs.getChannelId());
        basicReturns.setRid(BasicArgs.getRid());
        basicReturns.setOk(ok);
        byte[] payload = BinaryTool.toByte(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(payload.length);
        response.setPayload(payload);
        System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
                + ", type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

    private void clearCloseSessions(Socket clientSocket) {
        // 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉.
        List<String> toDeleteChannelId = new ArrayList<>();
        for (Map.Entry<String, Socket> entry : sessions.entrySet()) {
            if (entry.getValue() == clientSocket) {
                // 不能在这里直接删除!!!
                // 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!!
                // sessions.remove(entry.getKey());
                toDeleteChannelId.add(entry.getKey());
            }
        }
        for (String channelId : toDeleteChannelId) {
            sessions.remove(channelId);
        }
        System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId);
    }


}

十五.实现客户端

创建ConnectionFactory.

表示用来创建连接的工厂类:

/**
 *连接工厂
 */
@Data
public class ConnectionFactory {
        // broker server 的 ip 地址
        private String host;
        // broker server 的端口号
        private int port;

        public Connection newConnection() throws IOException {
            Connection connection = new Connection(host, port);
            return connection;
        }
}

创建Connection类:

一个Connection对应一个TCP,一个连接可以包含多个channel.

public class Connection {
    private Socket socket = null;
    private InputStream inputStream;
    private OutputStream outputStream;
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;
    //创建线程池,用来处理客户端这边执行用户回调的线程池
    private ExecutorService callbackPool = null;

    //  创建一个hash.来管理多个channel
    ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();

    //这个方法在客户端构造好请求后,调用,用来发送请求到服务器:
    public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());

    }
// 和服务器建立连接,接收服务器返回的响应,并处理响应
    public Connection(String host,int port) throws IOException {
        socket = new Socket(host,port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);

        callbackPool = Executors.newFixedThreadPool(4);
//      创建一个扫描线程,不断的从socket中读取响应,交给对应的channel进行处理
        Thread t = new Thread(()->{
            try{
                while (!socket.isClosed()){
                    Response response = readResponse();
                    //处理响应
                    dispatchResponse(response);
                }
            } catch (SocketException e){
                //连接正常断开
                System.out.println("[Connection] 连接正常断开");
            }catch (IOException | ClassNotFoundException | MqException e) {
                System.out.println("[Connection] 连接异常断开");
                e.printStackTrace();
            }
        });
        t.start();
    }

    public void close(){
        try{
            //关闭Connection ,释放资源
            callbackPool.shutdownNow();
            channelMap.clear();
            outputStream.close();
            inputStream.close();
            socket.close();;
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    // 读取服务器返回的响应
    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if (n != response.getLength()) {
            throw new IOException("读取的响应数据不完整!");
        }
        response.setPayload(payload);
        System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

    // 使用这个方法来分别处理响应, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.
    private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if (response.getType() == 0xc) {
            // 服务器推送给消费者客户端的消息数据
            SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
            // 根据 channelId 找到对应的 channel 对象
            Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
            }
            // 执行该 channel 对象内部的回调.
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handlerDeliver(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch (MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
            // 当前响应是针对刚才的控制请求的响应
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
            // 把这个结果放到对应的 channel 的 hash 表中.
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
            }
            //获取到响应后,将其放到响应的集合中,让客户端从集合中取走对应的响应.
            channel.putReturns(basicReturns);
        }
    }
// 通过这个方法, 在 Connection 中能够创建出一个 Channel
    public Channel createChannel() throws IOException {
        String channelId = "C-" + UUID.randomUUID().toString();
        Channel channel = new Channel(channelId, this);
        // 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.
        channelMap.put(channelId, channel);
        // 同时也需要把 "创建 channel" 的这个消息也告诉服务器.
        boolean ok = channel.createChannel();
        if (!ok) {
            // 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!
            // 把刚才已经加入 hash 表的键值对, 再删了.
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }
}

创建Channel类:

用于客户端发送请求调用的相关的API:

@Data
public class Channel {
    private String channelId;
    // 当前这个 channel 属于哪个连接.
    private Connection connection;
    // 用来存储后续客户端收到的服务器的响应.
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    // 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.
    // 此处约定一个 Channel 中只能有一个回调.
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }
    /**   type 表⽰请求响应不同的功能. 取值如下
     *  0x1  创建 channel
     * • 0x2  关闭 channel
     * • 0x3  创建 exchange
     * • 0x4  销毁 exchange
     * • 0x5  创建 queue
     * • 0x6  销毁 queue
     * • 0x7  创建 binding
     * • 0x8  销毁 binding
     * • 0x9  发送 message
     * • 0xa  订阅 message
     * • 0xb  返回 ack
     * • 0xc  服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
     */
    // 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.
    public boolean createChannel() throws IOException {
        // 对于创建 Channel 操作来说, payload 就是一个 basicArgs 对象
        BasicArgs basicArgs = new BasicArgs();
        basicArgs.setChannelId(channelId);
        basicArgs.setRid(generateRid());
        byte[] payload = BinaryTool.toByte(basicArgs);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

        // 构造出完整请求之后, 就可以发送这个请求了.
        connection.writeRequest(request);
        // 等待服务器的响应
        //服务器对根据请求处理并返回响应,对请求的处理时间不确定,
        // 该步骤可能会发生阻塞
        BasicReturns basicReturns = waitResult(basicArgs.getRid());
        return basicReturns.isOk();
    }
    // 通过UUID,生成唯一rid
    private String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }

    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null) {
            // 如果查询结果为 null, 说明包裹还没回来.
            // 此时就需要阻塞等待.
            synchronized (this) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // 读取成功之后, 还需要把这个消息从哈希表中删除掉.
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            // 当前也不知道有多少个线程在等待上述的这个响应.
            // 把所有的等待的线程都唤醒.
            notifyAll();
        }
    }

    // 关闭 channel, 给服务器发送一个 type = 0x2 的请求
    public boolean close() throws IOException {
        BasicArgs basicArgs = new BasicArgs();
        basicArgs.setRid(generateRid());
        basicArgs.setChannelId(channelId);
        byte[] payload = BinaryTool.toByte(basicArgs);

        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArgs.getRid());
        return basicReturns.isOk();
    }

    // 创建交换机
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, 
               boolean durable, boolean autoDelete, Map<String, Object> Args) throws IOException {
        ExchangeDeclareArgs exchangeDeclareArgs = new ExchangeDeclareArgs();
        exchangeDeclareArgs.setRid(generateRid());
        exchangeDeclareArgs.setChannelId(channelId);
        exchangeDeclareArgs.setExchangeName(exchangeName);
        exchangeDeclareArgs.setType(exchangeType);
        exchangeDeclareArgs.setDurable(durable);
        exchangeDeclareArgs.setAutoDelete(autoDelete);
        exchangeDeclareArgs.setArgs(Args);
        byte[] payload = BinaryTool.toByte(exchangeDeclareArgs);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArgs.getRid());
        return basicReturns.isOk();
    }

    // 删除交换机
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArgs Args = new ExchangeDeleteArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }

    // 创建队列
    public boolean queueDeclare(String queueName, boolean durable, boolean autoDelete,
                                Map<String, Object> Args) throws IOException {
        QueueDeclareArgs queueDeclareArgs = new QueueDeclareArgs();
        queueDeclareArgs.setRid(generateRid());
        queueDeclareArgs.setChannelId(channelId);
        queueDeclareArgs.setQueueName(queueName);
        queueDeclareArgs.setDurable(durable);
        queueDeclareArgs.setAutoDelete(autoDelete);
        queueDeclareArgs.setArgs(Args);
        byte[] payload = BinaryTool.toByte(queueDeclareArgs);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArgs.getRid());
        return basicReturns.isOk();
    }

    // 删除队列
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArgs Args = new QueueDeleteArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setQueueName(queueName);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }

    // 创建绑定
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
        BindingDeclareArgs Args = new BindingDeclareArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setQueueName(queueName);
        Args.setExchangeName(exchangeName);
        Args.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }

    // 解除绑定
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        BindingDeleteArgs Args = new BindingDeleteArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setQueueName(queueName);
        Args.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }

    // 发送消息
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArgs Args = new BasicPublishArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setExchangeName(exchangeName);
        Args.setRoutingKey(routingKey);
        Args.setBasicProperties(basicProperties);
        Args.setBody(body);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }

    // 订阅消息
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
        // 先设置回调.
        if (this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
        }
        this.consumer = consumer;

        BasicConsumerArgs Args = new BasicConsumerArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.
        Args.setQueueName(queueName);
        Args.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }

    // 确认消息
    public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArgs Args = new BasicAckArgs();
        Args.setRid(generateRid());
        Args.setChannelId(channelId);
        Args.setQueueName(queueName);
        Args.setMessageId(messageId);
        byte[] payload = BinaryTool.toByte(Args);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(Args.getRid());
        return basicReturns.isOk();
    }
}

客户端代码测试:


@SpringBootTest
public class MqClientTest {
    private BrokerServer brokerServer = null;
    private ConnectionFactory factory = null;
    private Thread t = null;

    @BeforeEach
    public void setUp() throws IOException {
        // 1. 先启动服务器
        Mq02Application.context = SpringApplication.run(Mq02Application.class);
        brokerServer = new BrokerServer(9090);
        t = new Thread(() -> {
            // 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!
            try {
                brokerServer.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        t.start();

        // 2. 配置 ConnectionFactory
        factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
    }

    @AfterEach
    public void tearDown() throws IOException {
        // 停止服务器
        brokerServer.stop();
        // t.join();
        Mq02Application.context.close();

        // 删除必要的文件
        File file = new File("./data");
        FileUtils.deleteDirectory(file);

        factory = null;
    }

    @Test
    public void testConnection() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
    }
    @Test
    public void testChannel() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);
    }

    @Test
    public void testExchange() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);

        ok = channel.exchangeDelete("testExchange");
        Assertions.assertTrue(ok);

        // 此处稳妥起见, 把改关闭的要进行关闭.
        channel.close();
        connection.close();
    }

    @Test
    public void testQueue() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.queueDeclare("testQueue", true, false,  null);
        Assertions.assertTrue(ok);

        ok = channel.queueDelete("testQueue");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }

    @Test
    public void testBinding() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);
        ok = channel.queueDeclare("testQueue", true,  false, null);
        Assertions.assertTrue(ok);

        ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");
        Assertions.assertTrue(ok);

        ok = channel.queueUnbind("testQueue", "testExchange");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }

    @Test
    public void testMessage() throws IOException, MqException, InterruptedException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        Assertions.assertTrue(ok);
        ok = channel.queueDeclare("testQueue", true, false, null);
        Assertions.assertTrue(ok);

        byte[] requestBody = "hello".getBytes();
        ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);
        Assertions.assertTrue(ok);

        ok = channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                Assertions.assertArrayEquals(requestBody, body);
                System.out.println("[消费数据] 结束!");
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        channel.close();
        connection.close();
    }
}

完成

成果测试:

启动消息队列服务器:

//启动服务器:
        BrokerServer brokerServer = new BrokerServer(9090);
        brokerServer.start();

创建生产者 发送消息:

/**
 * 模拟生产者
 */
public class producer {
    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        System.out.println("启动生产者");
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        //创建连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();

        //创建交换机 队列 绑定
        channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);
        channel.queueDeclare("queue",true,false,null);

        //发送消息
        boolean ok = channel.basicPublish("exchange", "queue",null,"hello".getBytes());
        System.out.println("消息发送成功: ok:"+ok);
        Thread.sleep(1000);
        //关闭资源
        channel.close();
        connection.createChannel();
    }
}

创建消费者消费消息:

/**
 * 模拟消费者
 */
public class consumer {
    public static void main(String[] args) throws IOException, MqException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        System.out.println("消费者启动");
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("exchange", ExchangeType.DIRECT,true,false,null);
        channel.queueDeclare("queue",true,false,null);

        //接收消息
        boolean ok = channel.basicConsume("queue", true, new org.rabbitmq.mq02.common.Consumer() {
            @Override
            public void handlerDeliver(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("处理消息开始");
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("basicProperties:"+basicProperties);
                System.out.println("body:"+body.toString());
                System.out.println("处理消息结束");
            }
        });
        System.out.println("消费一条消息成功 ok:"+ok);
        // 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
        while (true) {
            Thread.sleep(500);
        }
    }
}

完结.

项目源码:

Admin/模拟实现消息队列 - Gitee.com


网站公告

今日签到

点亮在社区的每一天
去签到