目录
队列管理模块在mqserver目录下实现。
第一节:代码实现
交换机管理模块和队列管理模块十分相似,因为它们的管理逻辑是一样的,都是使用sqlite数据库文件进行持久化管理,名字:信息 的映射进行内存管理。
很多代码都可以直接拷贝,稍加修改即可。
创建一个名为mq_msgqueue.hpp的文件,打开并防止重复包含、添加所需头文件、声明命名空间,这些内容可以在mq_exchange.hpp中拷贝:
#ifndef __M_MSGQUEUE_H__ #define __M_MSGQUEUE_H__ #include "../mqcommon/mq_logger.hpp" #include "../mqcommon/mq_helper.hpp" #include "../mqcommon/mq_msg.pb.h" #include <google/protobuf/map.h> #include <memory> #include <iostream> #include <unordered_map> namespace zd {}; #endif
1-1.MsgQueue类
同样先定义一个交换机类,它保存交换机的各种信息:
(1)队列名称
(2)持久化标志
(3)独占标志:如果设置为true,那么队列最多只能有一个订阅者
(4)自动删除标志:当队列没有绑定,没有订阅者时删除队列
(5)其他参数:保存一些客户端可自定义的信息
因为交换机也有 args 成员,所以它也需要 args 的两个处理接口,直接从 class Exchange 拷贝即可。
// 1.定义消息队列类 class MsgQueue { public: using ptr = std::shared_ptr<MsgQueue>; std::string name; // 队列名称 bool durable; // 持久化标志 bool exclusive; // 独占标志 bool autodelete; // 自动删除标志 google::protobuf::Map<std::string,std::string> args; // 其他参数 MsgQueue(){} MsgQueue(const std::string& qname,bool qdurable,bool qexclusive, bool qautodelete, const google::protobuf::Map<std::string,std::string>& qargs): name(qname), durable(qdurable), exclusive(qexclusive), autodelete(qautodelete), args(qargs) {} // args在文件中会以 k1=v1&k2=v2&k3=v3 的形式存储 // 读取上述内容后setArgs解析这个字符串,再存储到this->args中 void setArgs(const std::string& str_args) { std::vector<std::string> sub_args; size_t ret = StrHelper::split(str_args,"&",sub_args); for(auto& str:sub_args) { size_t pos = str.find("="); std::string key = str.substr(0,pos); std::string val = str.substr(pos+1); args[key] = val; } } // 将this->args序列化成 k1=v1&k2=v2&k3=v3 的形式 std::string getArgs() { std::string result; for(auto& it:args) { result+=it.first+"="+it.second+"&"; } if(!result.empty()) result.pop_back(); return result; } };
1-2.MsgQueueMapper类
class MsgQueueMapper用于队列的持久化管理,在实现它之前,也需要先重定义一个unordered_map:
using MsgQueueMap = std::unordered_map<std::string,MsgQueue::ptr>;
它提供的接口与class ExchageMapper基本一致,只是操作的表由 exchange_table 变成了 msgqueue_table,持久化数据也变化了一些:
// 定义消息队列数据持久化管理类 class MsgQueueMapper { public: MsgQueueMapper(const std::string& dbfile): _sql_helper(dbfile) { std::string path = FileHelper::parentDirectory(dbfile); FileHelper::createDirectory(path); assert(_sql_helper.open()); } // 创建表 void createTable() { #define CREATE_TABLE_ "create table if not exists msgqueue_table(\ name varchar(32) primary key,\ durable int,\ exclusive int,\ autodelete int,\ args varchar(128));" bool ret = _sql_helper.exec(CREATE_TABLE_,nullptr,nullptr); if(ret == false) { LOG("创建消息队列数据库表失败!"); abort(); // 异常退出 } } // 删除表 void removeTable() { #define DROP_TABLE_ "drop table if exists msgqueue_table;" bool ret = _sql_helper.exec(DROP_TABLE_,nullptr,nullptr); if(ret == false) { LOG("删除消息队列数据库表失败!"); abort(); // 异常退出 } } // 插入一个消息队列 bool insert(MsgQueue::ptr& exp) { #define INSERT_SQL_ "insert into msgqueue_table values('%s','%d','%d','%d','%s');" char sql_str[4096] = {0}; sprintf(sql_str,INSERT_SQL_,exp->name.c_str(),exp->durable,exp->exclusive,exp->autodelete,exp->getArgs().c_str()); bool ret = _sql_helper.exec(sql_str,nullptr,nullptr); if(ret == false) { LOG("数据库新增消息队列信息失败!"); return false; } return true; } // 删除一个消息队列 void remove(const std::string& qcname) { std::string DELE_SQL = "delete from msgqueue_table where name='"+qcname+"';"; bool ret = _sql_helper.exec(DELE_SQL,nullptr,nullptr); if(ret == false) { LOG("数据库删除消息队列信息失败!"); abort(); // 异常退出 } } // 从数据库中恢复消息队列数据 std::unordered_map<std::string,MsgQueue::ptr> recovery() { createTable(); MsgQueueMap result; std::string sql = "select name,durable,exclusive,autodelete,args from msgqueue_table;"; // 它会根据数据库的数据量调整回调函数的调用次数,每次取出一套数据 _sql_helper.exec(sql.c_str(),MsgQueueMapper::selectCallback,&result); return result; } private: static int selectCallback(void* arg,int numcol,char** row,char** fields) { MsgQueueMap* result = (MsgQueueMap*)arg; auto qup = std::make_shared<MsgQueue>(); qup->name = row[0]; qup->durable = (bool)std::stoi(row[1]); qup->exclusive = (bool)std::stoi(row[2]); qup->autodelete = (bool)std::stoi(row[3]); if(row[4]) qup->setArgs(row[4]); result->insert(std::make_pair(qup->name,qup)); return 0; } private: zd::SqliteHelper _sql_helper; };
1-3.MsgQueueManager类
它的实现与 class ExchangeManager 基本一致:
class MsgQueueManager { public: using ptr = std::shared_ptr<MsgQueueManager>; MsgQueueManager(const std::string& dbfile): _mapper(dbfile) { _msgqueues = _mapper.recovery(); } // 声明消息队列 bool declareMsgQueue(const std::string& name,bool durable,bool exclusive, bool autodelete, const google::protobuf::Map<std::string,std::string>& args) { std::unique_lock<std::mutex> lock(_mtx); if(_msgqueues.find(name) != _msgqueues.end()) { // 消息队列已经存在就不需要添加了 return true; } auto qup = std::make_shared<MsgQueue>(name,durable,exclusive,autodelete,args); if(durable == true) { bool ret = _mapper.insert(qup); if(ret == false) { LOG("消息队列 %s 持久化失败",name.c_str()); return false; } } _msgqueues.insert(std::make_pair(name,qup)); return true; } // 删除消息队列 void deleteMsgQueue(const std::string& name) { std::unique_lock<std::mutex> lock(_mtx); if(_msgqueues.find(name) == _msgqueues.end()) { // 交换机不存在就不需要删除了 return; } _mapper.remove(name); // 数据库中删除消息队列信息 _msgqueues.erase(name); // 内存中删除消息队列 } // 获得消息队列 MsgQueue::ptr selectMsgQueue(const std::string& name) { std::unique_lock<std::mutex> lock(_mtx); auto it = _msgqueues.find(name); if(it == _msgqueues.end()) return nullptr; return it->second; } // 获得所有消息队列 const MsgQueueMap& selectAll() { return _msgqueues; } // 判断消息队列是否存在 bool exists(const std::string& name) { std::unique_lock<std::mutex> lock(_mtx); if(_msgqueues.find(name) == _msgqueues.end()) return false; return true; } // 清理所有消息队列的数据 void clear() { std::unique_lock<std::mutex> lock(_mtx); _mapper.removeTable(); _msgqueues.clear(); } size_t size() { return _msgqueues.size(); } private: std::mutex _mtx; MsgQueueMap _msgqueues; MsgQueueMapper _mapper; // 持久化管理句柄 };
实际上 绑定管理模块 的实现也和交换机管理模块、队列管理模块十分相似,它们的管理思路是一样的,只是对外提供的接口不同。
第二节:单元测试
打开mqtest目录。
创建mq_msgqueue_test.cc文件,并做好前置工作:
#include "../mqserver/mq_msgqueue.hpp" #include <gtest/gtest.h>
然后把mq_exchange_test.cc的测试套件拷贝过来,然后进行修改:
zd::MsgQueueManager::ptr qmp; // 全局测试套件------------------------------------------------ // 自己初始化自己的环境,使不同单元测试之间解耦 class MsgQueueTest :public testing::Environment { public: // 全部单元测试之前调用一次 virtual void SetUp() override { // std::cout << "单元测试执行前的环境初始化" << std::endl; qmp = std::make_shared<zd::MsgQueueManager>("./data/meta.bd"); } // 全部单元测试之后调用一次 virtual void TearDown() override { // std::cout << "单元测试执行后的环境清理" << std::endl; // qmp->clear(); } }; // 单元测试 // 测试名称与类名称相同,则会先调用SetUp TEST(MsgQueueTest,MsgQueueTest_test1_Test) { std::cout << "单元测试-1" << std::endl; // 声明队列 google::protobuf::Map<std::string,std::string> args; args.insert({"k1","v1"}); args.insert({"k2","v2"}); args.insert({"k3","v3"}); //1. 3个持久化 qmp->declareMsgQueue("q-1",true,false,false,args); qmp->declareMsgQueue("q-2",true,false,false,args); qmp->declareMsgQueue("q-3",true,false,false,args); //2. 1个非持久化 qmp->declareMsgQueue("q-4",false,false,false,args); // 此时队列应该有4个 ASSERT_EQ(qmp->size(),4); // 移除队列 qmp->deleteMsgQueue("q-5"); // 移除不存在的队列 ASSERT_EQ(qmp->size(),4); qmp->deleteMsgQueue("q-1"); // 此时q-1不存在,而q-2\q-3\q-4存在 ASSERT_EQ(qmp->exists("q-1"),false); ASSERT_EQ(qmp->exists("q-2"),true); ASSERT_EQ(qmp->exists("q-3"),true); ASSERT_EQ(qmp->exists("q-4"),true); // 获取q-2交换机 zd::MsgQueue::ptr q2 = qmp->selectMsgQueue("q-2"); ASSERT_EQ(q2->name,"q-2"); ASSERT_EQ(q2->autodelete,false); ASSERT_EQ(q2->durable,true); ASSERT_EQ(q2->exclusive,false); } TEST(MsgQueueTest,MsgQueueTest_test2_Test) { std::cout << "单元测试-2" << std::endl; // 获取所有队列数据,进行打印其名字 zd::MsgQueueMap m = qmp->selectAll(); for(auto& qit:m) { LOG("%s",qit.second->name.c_str()); } } TEST(MsgQueueTest,MsgQueueTest_test3_Test) { std::cout << "单元测试-3" << std::endl; } // 单元测试全部结束后调用TearDown // ---------------------------------------------------------- int main(int argc,char** argv) { testing::InitGoogleTest(&argc,argv); testing::AddGlobalTestEnvironment(new MsgQueueTest); // 注册Test的所有单元测试 if(RUN_ALL_TESTS() != 0) // 运行所有单元测试 { printf("单元测试失败!\n"); } return 0; }
编译后的执行结果:
结果符合预期。
然后查看meta.bd文件中的 msgqueue_table 的数据:
持久化的q-1被移除了,所以只剩q-2和q-3了,这也是符合预期的。
最后注释掉单元测试-1和单元测试-2,使用单元测试-3进行拉取历史队列数据的测试:
TEST(MsgQueueTest,MsgQueueTest_test3_Test) { std::cout << "单元测试-3" << std::endl; ASSERT_EQ(qmp->size(),2); ASSERT_EQ(qmp->exists("q-2"),true); ASSERT_EQ(qmp->exists("q-3"),true); for(auto& kv:qmp->selectMsgQueue("q-2")->args) { LOG("%s:%s",kv.first.c_str(),kv.second.c_str()); } }
执行结果:
说明历史数据的拉取和 args 的处理都是正确的。
至此,队列管理模块也完成了。
下期预告:
之后是 绑定管理模块 的实现,它与 交换机管理模块 也是十分相似的,只是提供的接口不同。