第五章:队列管理模块

发布于:2025-02-26 ⋅ 阅读:(23) ⋅ 点赞:(0)

目录

第一节:代码实现

        1-1.MsgQueue类

        1-2.MsgQueueMapper类

        1-3.MsgQueueManager类

第二节:单元测试

下期预告:


        队列管理模块在mqserver目录下实现。

第一节:代码实现

        交换机管理模块和队列管理模块十分相似,因为它们的管理逻辑是一样的,都是使用sqlite数据库文件进行持久化管理,名字:信息 的映射进行内存管理。

        很多代码都可以直接拷贝,稍加修改即可。

        创建一个名为mq_msgqueue.hpp的文件,打开并防止重复包含、添加所需头文件、声明命名空间,这些内容可以在mq_exchange.hpp中拷贝:

#ifndef __M_MSGQUEUE_H__
#define __M_MSGQUEUE_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"

#include <google/protobuf/map.h>
#include <memory>
#include <iostream>
#include <unordered_map>

namespace zd
{};

#endif

        1-1.MsgQueue类

        同样先定义一个交换机类,它保存交换机的各种信息:

        (1)队列名称

        (2)持久化标志

        (3)独占标志:如果设置为true,那么队列最多只能有一个订阅者

        (4)自动删除标志:当队列没有绑定,没有订阅者时删除队列

        (5)其他参数:保存一些客户端可自定义的信息

        因为交换机也有 args 成员,所以它也需要 args 的两个处理接口,直接从 class Exchange 拷贝即可。

    // 1.定义消息队列类
    class MsgQueue
    { 
    public:
        using ptr = std::shared_ptr<MsgQueue>;
        
        std::string name; // 队列名称
        bool durable;     // 持久化标志
        bool exclusive;   // 独占标志
        bool autodelete;  // 自动删除标志
        google::protobuf::Map<std::string,std::string> args; // 其他参数

        MsgQueue(){}
        MsgQueue(const std::string& qname,bool qdurable,bool qexclusive,
                 bool qautodelete,
                 const google::protobuf::Map<std::string,std::string>& qargs):
        name(qname),
        durable(qdurable),
        exclusive(qexclusive),
        autodelete(qautodelete),
        args(qargs)
        {}

        // args在文件中会以 k1=v1&k2=v2&k3=v3 的形式存储
        // 读取上述内容后setArgs解析这个字符串,再存储到this->args中
        void setArgs(const std::string& str_args)
        {
            std::vector<std::string> sub_args;
            size_t ret = StrHelper::split(str_args,"&",sub_args);
            for(auto& str:sub_args)
            {
                size_t pos = str.find("=");
                std::string key = str.substr(0,pos);
                std::string val = str.substr(pos+1);
                args[key] = val;
            }
        }
        // 将this->args序列化成 k1=v1&k2=v2&k3=v3 的形式
        std::string getArgs()
        {
            std::string result;
            for(auto& it:args)
            {
                result+=it.first+"="+it.second+"&";
            }
            if(!result.empty())
                result.pop_back();
            return result;
        }
    };

        1-2.MsgQueueMapper类

        class MsgQueueMapper用于队列的持久化管理,在实现它之前,也需要先重定义一个unordered_map:

using MsgQueueMap = std::unordered_map<std::string,MsgQueue::ptr>;

        它提供的接口与class ExchageMapper基本一致,只是操作的表由 exchange_table 变成了 msgqueue_table,持久化数据也变化了一些:

    // 定义消息队列数据持久化管理类
    class MsgQueueMapper
    {
    public:
        MsgQueueMapper(const std::string& dbfile):
        _sql_helper(dbfile)
        {
            std::string path = FileHelper::parentDirectory(dbfile);
            FileHelper::createDirectory(path);
            assert(_sql_helper.open());
        }
        // 创建表
        void createTable()
        {
            #define CREATE_TABLE_ "create table if not exists msgqueue_table(\
                                  name varchar(32) primary key,\
                                  durable int,\
                                  exclusive int,\
                                  autodelete int,\
                                  args varchar(128));"
            bool ret = _sql_helper.exec(CREATE_TABLE_,nullptr,nullptr);
            if(ret == false)
            {
                LOG("创建消息队列数据库表失败!");
                abort(); // 异常退出
            }
        } 
        // 删除表
        void removeTable()
        {
            #define DROP_TABLE_ "drop table if exists msgqueue_table;"
            bool ret = _sql_helper.exec(DROP_TABLE_,nullptr,nullptr);
            if(ret == false)
            {
                LOG("删除消息队列数据库表失败!");
                abort(); // 异常退出
            }
        }
        // 插入一个消息队列
        bool insert(MsgQueue::ptr& exp)
        {
            #define INSERT_SQL_ "insert into msgqueue_table values('%s','%d','%d','%d','%s');"
            char sql_str[4096] = {0};
            sprintf(sql_str,INSERT_SQL_,exp->name.c_str(),exp->durable,exp->exclusive,exp->autodelete,exp->getArgs().c_str());
            bool ret = _sql_helper.exec(sql_str,nullptr,nullptr);
            if(ret == false)
            {
                LOG("数据库新增消息队列信息失败!");
                return false;
            }
            return true;
        }
        // 删除一个消息队列
        void remove(const std::string& qcname)
        {
            std::string DELE_SQL = "delete from msgqueue_table where name='"+qcname+"';";
            bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr);
            if(ret == false)
            {
                LOG("数据库删除消息队列信息失败!");
                abort(); // 异常退出
            }
        }
        // 从数据库中恢复消息队列数据
        std::unordered_map<std::string,MsgQueue::ptr> recovery()
        {
            createTable();

            MsgQueueMap result;
            std::string sql = "select name,durable,exclusive,autodelete,args from msgqueue_table;";
            // 它会根据数据库的数据量调整回调函数的调用次数,每次取出一套数据
            _sql_helper.exec(sql.c_str(),MsgQueueMapper::selectCallback,&result);
            return result;
        }
    private:
        static int selectCallback(void* arg,int numcol,char** row,char** fields)
        {
            MsgQueueMap* result = (MsgQueueMap*)arg;
            auto qup = std::make_shared<MsgQueue>();
            qup->name = row[0];
            qup->durable = (bool)std::stoi(row[1]);
            qup->exclusive = (bool)std::stoi(row[2]);
            qup->autodelete = (bool)std::stoi(row[3]);
            if(row[4])
            qup->setArgs(row[4]);
            result->insert(std::make_pair(qup->name,qup));
            return 0;
        }
    private:
        zd::SqliteHelper _sql_helper;
    };

        1-3.MsgQueueManager类

        它的实现与 class ExchangeManager 基本一致:

    class MsgQueueManager
    {
    public:
    
        using ptr = std::shared_ptr<MsgQueueManager>;

        MsgQueueManager(const std::string& dbfile):
        _mapper(dbfile)
        {
            _msgqueues = _mapper.recovery();
        }

        // 声明消息队列
        bool declareMsgQueue(const std::string& name,bool durable,bool exclusive,
                             bool autodelete,
                             const google::protobuf::Map<std::string,std::string>& args)
       {
           std::unique_lock<std::mutex> lock(_mtx);
           if(_msgqueues.find(name) != _msgqueues.end()) 
           {
               // 消息队列已经存在就不需要添加了
               return true;
           }
           auto qup = std::make_shared<MsgQueue>(name,durable,exclusive,autodelete,args);
           if(durable == true)
           {
               bool ret = _mapper.insert(qup);
               if(ret == false)
               {
                    LOG("消息队列 %s 持久化失败",name.c_str());
                    return false;
               }
           }
           _msgqueues.insert(std::make_pair(name,qup));
           return true;
       }

       // 删除消息队列
       void deleteMsgQueue(const std::string& name)
       {
           std::unique_lock<std::mutex> lock(_mtx);
           if(_msgqueues.find(name) == _msgqueues.end()) 
           {
               // 交换机不存在就不需要删除了
               return;
           }
           _mapper.remove(name);   // 数据库中删除消息队列信息
           _msgqueues.erase(name); // 内存中删除消息队列
       }

       // 获得消息队列
       MsgQueue::ptr selectMsgQueue(const std::string& name)
       {
           std::unique_lock<std::mutex> lock(_mtx);
           auto it = _msgqueues.find(name);
           if(it == _msgqueues.end())
               return nullptr;
           return it->second;
       }

       // 获得所有消息队列
       const MsgQueueMap& selectAll()
       {
           return  _msgqueues;
       }

       // 判断消息队列是否存在
       bool exists(const std::string& name)
       {
           std::unique_lock<std::mutex> lock(_mtx);
           if(_msgqueues.find(name) == _msgqueues.end())
               return false;
           return true;
       }

       // 清理所有消息队列的数据
       void clear()
       {
           std::unique_lock<std::mutex> lock(_mtx);
           _mapper.removeTable();
           _msgqueues.clear();
       }

       size_t size()
       {
           return _msgqueues.size();
       }

        
        private:
        std::mutex _mtx;
        MsgQueueMap  _msgqueues;
        MsgQueueMapper _mapper; // 持久化管理句柄
    };

        实际上 绑定管理模块 的实现也和交换机管理模块、队列管理模块十分相似,它们的管理思路是一样的,只是对外提供的接口不同。

        

第二节:单元测试

        打开mqtest目录。

        创建mq_msgqueue_test.cc文件,并做好前置工作:

#include "../mqserver/mq_msgqueue.hpp"
#include <gtest/gtest.h>

        然后把mq_exchange_test.cc的测试套件拷贝过来,然后进行修改:

zd::MsgQueueManager::ptr qmp;
// 全局测试套件------------------------------------------------
// 自己初始化自己的环境,使不同单元测试之间解耦
class MsgQueueTest :public testing::Environment
{
public:
    // 全部单元测试之前调用一次
    virtual void SetUp() override
    {
        // std::cout << "单元测试执行前的环境初始化" << std::endl;
        qmp = std::make_shared<zd::MsgQueueManager>("./data/meta.bd");
    }   

    // 全部单元测试之后调用一次
    virtual void TearDown() override
    {
        // std::cout << "单元测试执行后的环境清理" << std::endl;
        // qmp->clear();
    }
};

// 单元测试
// 测试名称与类名称相同,则会先调用SetUp
TEST(MsgQueueTest,MsgQueueTest_test1_Test)
{
    std::cout << "单元测试-1" << std::endl;
    // 声明队列
    google::protobuf::Map<std::string,std::string> args;
    args.insert({"k1","v1"});
    args.insert({"k2","v2"});
    args.insert({"k3","v3"});
        //1.  3个持久化
    qmp->declareMsgQueue("q-1",true,false,false,args);
    qmp->declareMsgQueue("q-2",true,false,false,args);
    qmp->declareMsgQueue("q-3",true,false,false,args);
        //2.  1个非持久化
    qmp->declareMsgQueue("q-4",false,false,false,args);
    // 此时队列应该有4个
    ASSERT_EQ(qmp->size(),4);

    // 移除队列
    qmp->deleteMsgQueue("q-5"); // 移除不存在的队列
    ASSERT_EQ(qmp->size(),4);
    qmp->deleteMsgQueue("q-1");
    // 此时q-1不存在,而q-2\q-3\q-4存在
    ASSERT_EQ(qmp->exists("q-1"),false);
    ASSERT_EQ(qmp->exists("q-2"),true);
    ASSERT_EQ(qmp->exists("q-3"),true);
    ASSERT_EQ(qmp->exists("q-4"),true);
    // 获取q-2交换机
    zd::MsgQueue::ptr q2 = qmp->selectMsgQueue("q-2");
    ASSERT_EQ(q2->name,"q-2");
    ASSERT_EQ(q2->autodelete,false);
    ASSERT_EQ(q2->durable,true);
    ASSERT_EQ(q2->exclusive,false);
}

TEST(MsgQueueTest,MsgQueueTest_test2_Test)
{
    std::cout << "单元测试-2" << std::endl;
    // 获取所有队列数据,进行打印其名字
    zd::MsgQueueMap m = qmp->selectAll();
    for(auto& qit:m)
    {
        LOG("%s",qit.second->name.c_str());
    }
}

TEST(MsgQueueTest,MsgQueueTest_test3_Test)
{
    std::cout << "单元测试-3" << std::endl;

}
// 单元测试全部结束后调用TearDown

// ----------------------------------------------------------
int main(int argc,char** argv)
{
    testing::InitGoogleTest(&argc,argv);

    testing::AddGlobalTestEnvironment(new MsgQueueTest); // 注册Test的所有单元测试

    if(RUN_ALL_TESTS() != 0) // 运行所有单元测试
    {
        printf("单元测试失败!\n");
    }
    return 0;
}

        编译后的执行结果: 

                ​​​​​​​        ​​​​​​​        

        结果符合预期。

        然后查看meta.bd文件中的 msgqueue_table 的数据:

        ​​​​​​​        ​​​​​​​        

        持久化的q-1被移除了,所以只剩q-2和q-3了,这也是符合预期的。

        最后注释掉单元测试-1和单元测试-2,使用单元测试-3进行拉取历史队列数据的测试:

TEST(MsgQueueTest,MsgQueueTest_test3_Test)
{
    std::cout << "单元测试-3" << std::endl;
    ASSERT_EQ(qmp->size(),2);
    ASSERT_EQ(qmp->exists("q-2"),true);
    ASSERT_EQ(qmp->exists("q-3"),true);
    for(auto& kv:qmp->selectMsgQueue("q-2")->args)
    {
        LOG("%s:%s",kv.first.c_str(),kv.second.c_str());
    }
}

        执行结果:

        ​​​​​​​        ​​​​​​​        ​​​​​​​        

        说明历史数据的拉取和 args 的处理都是正确的。

        至此,队列管理模块也完成了。

下期预告:

        之后是 绑定管理模块 的实现,它与 交换机管理模块 也是十分相似的,只是提供的接口不同。