目录
虚拟机模块在mqserver目录下实现。
第一节:代码实现
虚拟机的各种接口其实就是直接封装各个管理模块的接口。
1-1.前置代码
防重复包含、头文件、和命名空间:
#ifndef __M_VIRHOST_H__ #define __M_VIRHOST_H__ #include "mq_exchange.hpp" #include "mq_msgqueue.hpp" #include "mq_binding.hpp" #include "mq_message.hpp" namespace zd {}; #endif
1-2.成员变量
虚拟机由四个模块组合而成,它通过这4个模块的句柄管理交换机、队列、绑定和消息:
class VirtualHost { public: using ptr = std::shared_ptr<VirtualHost>; private: std::string _virhostname; // 虚拟机的名字 ExchangeManager::ptr _emp; // 交换机管理句柄 MsgQueueManager::ptr _qmp; // 队列管理句柄 BindingManager::ptr _bmp; // 绑定管理句柄 MessageManager::ptr _mmp; // 消息管理句柄 }
1-3.构造函数
virhostname:虚拟机的名字,无实际意义
base_dir:存放持久化消息的目录
dbfile:sqlite文件的路径,这个文件保存持久化的交换机、队列、绑定信息
拉取历史队列信息之后,根据已经存在的队列拉取各自的历史消息。
VirtualHost(const std::string& virhostname,const std::string& base_dir,const std::string& dbfile): _virhostname(virhostname), _emp(std::make_shared<ExchangeManager>(dbfile)), _qmp(std::make_shared<MsgQueueManager>(dbfile)), _bmp(std::make_shared<BindingManager>(dbfile)), _mmp(std::make_shared<MessageManager>(base_dir)) { // 获取所有队列信息,通过队列名称恢复历史消息 MsgQueueMap qm = _qmp->selectAll(); for(const auto& it:qm) { _mmp->initQueueMesssge(it.first); } }
1-4.交换机接口
//----------------------------------------------------------------------- // 声明交换机 bool declareExchange( const std::string& name, zd::ExchangeType type, bool durable, bool autodelete, const google::protobuf::Map<std::string,std::string>& args) { return _emp->declareExchange(name,type,durable,autodelete,args); } // 移除交换机 void deleteExchange(const std::string& name) { // 删除交换机时,还需要移除交换机的绑定 _emp->deleteExchange(name); _bmp->removeExchangeBindings(name); } //-----------------------------------------------------------------------
1-5.队列接口
队列的接口比较特殊,它包含队列管理模块和消息管理模块:
//----------------------------------------------------------------------- // 声明队列 bool declareMsgQueue( const std::string& name, bool durable, bool exclusive, bool autodelete, const google::protobuf::Map<std::string,std::string>& args) { // 先创建消息队列的消息存储管理 _mmp->initQueueMesssge(name); // 再创建这个消息队列的管理 return _qmp->declareMsgQueue(name,durable,exclusive,autodelete,args); } // 移除队列 void deleteMsgQueue(const std::string& name) { // 先删除队列的消息,再删除队列的绑定,最后删除这个队列 _mmp->destoryQueueMessage(name); _bmp->removeMsgQueueBindings(name); _qmp->deleteMsgQueue(name); } //-----------------------------------------------------------------------
1-6.绑定接口
//----------------------------------------------------------------------- // 绑定 bool bind(const std::string& ename,const std::string& qname,const std::string& bid_ky) { // 如果 交换机 和 队列 都需要持久化,那么这条 绑定 也需要持久化 Exchange::ptr ep = _emp->selectExchange(ename); if(ep.get() == nullptr) { LOG("绑定失败,交换机 %s 不存在",ename.c_str()) return false; } MsgQueue::ptr qp = _qmp->selectMsgQueue(qname); if(qp.get() == nullptr) { LOG("绑定失败,队列 %s 不存在",qname.c_str()) return false; } return _bmp->bind(ename,qname,bid_ky,ep->durable && qp->durable); } // 解绑 void unBind(const std::string& ename,const std::string& qname) { _bmp->unBind(ename,qname); } //-----------------------------------------------------------------------
1-7.消息接口
//----------------------------------------------------------------------- // 获得指定交换机的所有绑定关系 MsgQueueBindingMap exchangeBindings(const std::string& ename) { return _bmp->getExchangeBindings(ename); } // 获得指定交换机的信息 Exchange::ptr getOneExchange(const std::string& ename) { return _emp->selectExchange(ename); } // 获取所有队列信息 MsgQueueMap getAllMsgQueue() { return _qmp->selectAll(); } //-----------------------------------------------------------------------
1-8.其他功能接口
其他可能用到的接口
//----------------------------------------------------------------------- // 获得指定交换机的所有绑定关系 MsgQueueBindingMap exchangeBindings(const std::string& ename) { return _bmp->getExchangeBindings(ename); } // 获得指定交换机的信息 Exchange::ptr getOneExchange(const std::string& ename) { return _emp->selectExchange(ename); } // 获取所有队列信息 MsgQueueMap getAllMsgQueue() { return _qmp->selectAll(); } //----------------------------------------------------------------------- // 判断存在 bool exchangeExists(const std::string& ename) { return _emp->exists(ename); } bool msgqueueExists(const std::string& qname) { return _qmp->exists(qname); } bool bindingExists(const std::string& ename,const std::string& qname) { return _bmp->exists(ename,qname); } // 清理 void clear() { _emp->clear(); _qmp->clear(); _bmp->clear(); _mmp->clear(); } //-----------------------------------------------------------------------
虚拟机的实现就是如此简单,因为要用的接口都实现了,封装一下即可。
第二节:单元测试
在mqtest目录下创建mq_virtualhost_test.cc文件,因为封装比较简单,底层的模块也已经测试过来,所以只简单测试一下即可。
虚拟机使用独立测试套件,它的特征是对于每个单元测试,环境初始化函数和清理函数都会调用一次,保证每个单元测试的独立性。
#include "../mqserver/mq_virtualhost.hpp" #include <gtest/gtest.h> #include <iostream> #include <unordered_set> zd::VirtualHost::ptr vhp; // 独立测试套件------------------------------------------------ // 与全局测试套件相比,可以定义成员变量,且成员变量仅相关单元测试可见 // 不需要注册测试单元 class HostTest:public testing::Test { public: // 每个单元测试之前都会调用一次 virtual void SetUp() override { std::cout << "每个单元测试执行前的环境初始化" << std::endl; vhp = std::make_shared<zd::VirtualHost>("host1","./data/host1/","./data/host1/meta.bd"); // 测试声明 std::unordered_map<std::string,std::string> m; vhp->declareExchange("e1",zd::ExchangeType::DIRECT,true,false,m); vhp->declareExchange("e2",zd::ExchangeType::DIRECT,true,false,m); vhp->declareExchange("e3",zd::ExchangeType::DIRECT,true,false,m); vhp->declareExchange("e4",zd::ExchangeType::DIRECT,true,false,m); vhp->declareExchange("e5",zd::ExchangeType::DIRECT,true,false,m); vhp->declareMsgQueue("q1",true,false,false,m); vhp->declareMsgQueue("q2",true,false,false,m); vhp->declareMsgQueue("q3",true,false,false,m); vhp->declareMsgQueue("q4",true,false,false,m); vhp->declareMsgQueue("q5",true,false,false,m); // 测试绑定 vhp->bind("e1","q1",""); vhp->bind("e2","q2",""); vhp->bind("e3","q3",""); vhp->bind("e4","q4",""); vhp->bind("e5","q5",""); vhp->bind("e1","q5",""); } // 每个单元测试之后都会调用一次 virtual void TearDown() override { std::cout << "每个单元测试执行后的环境清理" << std::endl; vhp->clear(); } }; // 测试声明和绑定 TEST_F(HostTest,HostTest_test1_Test) { std::cout << "单元测试-1" << std::endl; ASSERT_EQ(vhp->exchangeExists("e1"),true); ASSERT_EQ(vhp->exchangeExists("e2"),true); ASSERT_EQ(vhp->exchangeExists("e3"),true); ASSERT_EQ(vhp->exchangeExists("e4"),true); ASSERT_EQ(vhp->exchangeExists("e5"),true); ASSERT_EQ(vhp->msgqueueExists("q1"),true); ASSERT_EQ(vhp->msgqueueExists("q2"),true); ASSERT_EQ(vhp->msgqueueExists("q3"),true); ASSERT_EQ(vhp->msgqueueExists("q4"),true); ASSERT_EQ(vhp->msgqueueExists("q5"),true); ASSERT_EQ(vhp->bindingExists("e1","q1"),true); ASSERT_EQ(vhp->bindingExists("e2","q2"),true); ASSERT_EQ(vhp->bindingExists("e3","q3"),true); ASSERT_EQ(vhp->bindingExists("e4","q4"),true); ASSERT_EQ(vhp->bindingExists("e5","q5"),true); ASSERT_EQ(vhp->bindingExists("e1","q5"),true); } // 测试消息的推送 TEST_F(HostTest,HostTest_test2_Test) { std::cout << "单元测试-2" << std::endl; vhp->basicPublish("q1",nullptr,"Hello World-1"); zd::MessagePtr msgp = vhp->basicConsume("q1"); ASSERT_EQ(msgp->payload().body(),"Hello World-1"); zd::MessagePtr msgp_ = vhp->basicConsume("q1"); ASSERT_EQ(msgp_.get(),nullptr); zd::MessagePtr msgp__ = vhp->basicConsume("q6"); } // 测试消息的确认 TEST_F(HostTest,HostTest_test3_Test) { std::cout << "单元测试-3" << std::endl; zd::BasicProperties pb; std::string id = zd::UUIDHelper::uuid(); pb.set_id(id); pb.set_delivery_mode(zd::DeliveryMode::DURABLE); pb.set_routing_key("123"); vhp->basicPublish("q1",&pb,"Hello World-1"); vhp->basicConsume("q1"); vhp->basicAck("q1",id); ASSERT_EQ(vhp->basicConsume("q1"),nullptr); } // ---------------------------------------------------------- int main(int argc,char** argv) { testing::InitGoogleTest(&argc,argv); if(RUN_ALL_TESTS() != 0) // 运行所有单元测试 { printf("单元测试失败!\n"); } return 0; }
编译:
mq_virhost_test:mq_virhost_test.cc ../mqcommon/mq_msg.pb.cc g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf -lsqlite3
执行结果:
符合预期 。
那么虚拟机的"整合"也完成了。
下期预告:
之后将完成与binding_key和routing_key有关的模块——Router,它提供3个功能:
(1)routing_key的合法性检验
(2)binding_key的合法性检验
(3)匹配routing_key和binding_key,并返回匹配结果