第八章:虚拟机模块的整合

发布于:2025-02-26 ⋅ 阅读:(18) ⋅ 点赞:(0)

目录

第一节:代码实现

        1-1.前置代码

        1-2.成员变量

        1-3.构造函数

        1-4.交换机接口

        1-5.队列接口

        1-6.绑定接口

        1-7.消息接口

 

        1-8.其他功能接口

第二节:单元测试

下期预告: 


        虚拟机模块在mqserver目录下实现。

第一节:代码实现

        虚拟机的各种接口其实就是直接封装各个管理模块的接口。

        1-1.前置代码

        防重复包含、头文件、和命名空间:

#ifndef __M_VIRHOST_H__
#define __M_VIRHOST_H__

#include "mq_exchange.hpp"
#include "mq_msgqueue.hpp"
#include "mq_binding.hpp"
#include "mq_message.hpp" 

namespace zd
{};

#endif

        1-2.成员变量

        虚拟机由四个模块组合而成,它通过这4个模块的句柄管理交换机、队列、绑定和消息:

    class VirtualHost
    {
        public:
            using ptr = std::shared_ptr<VirtualHost>;    
        private:
            std::string _virhostname;   // 虚拟机的名字
            ExchangeManager::ptr _emp;  // 交换机管理句柄
            MsgQueueManager::ptr _qmp;  // 队列管理句柄
            BindingManager::ptr _bmp;   // 绑定管理句柄
            MessageManager::ptr _mmp;   // 消息管理句柄
    }

        1-3.构造函数

        virhostname:虚拟机的名字,无实际意义

        base_dir:存放持久化消息的目录

        dbfile:sqlite文件的路径,这个文件保存持久化的交换机、队列、绑定信息

        拉取历史队列信息之后,根据已经存在的队列拉取各自的历史消息。

            VirtualHost(const std::string& virhostname,const std::string& base_dir,const std::string& dbfile):
            _virhostname(virhostname),
            _emp(std::make_shared<ExchangeManager>(dbfile)),
            _qmp(std::make_shared<MsgQueueManager>(dbfile)),
            _bmp(std::make_shared<BindingManager>(dbfile)),
            _mmp(std::make_shared<MessageManager>(base_dir))
            {
                // 获取所有队列信息,通过队列名称恢复历史消息
                MsgQueueMap qm = _qmp->selectAll();
                for(const auto& it:qm)
                {
                    _mmp->initQueueMesssge(it.first);
                }
            }

        1-4.交换机接口

            //-----------------------------------------------------------------------
            // 声明交换机
            bool declareExchange(
                const std::string& name,
                zd::ExchangeType type,
                bool durable,
                bool autodelete,
                const google::protobuf::Map<std::string,std::string>& args)
            {
                return _emp->declareExchange(name,type,durable,autodelete,args);
            }
            // 移除交换机
            void deleteExchange(const std::string& name)
            {
                // 删除交换机时,还需要移除交换机的绑定
                _emp->deleteExchange(name);
                _bmp->removeExchangeBindings(name);
            }
            //-----------------------------------------------------------------------

        1-5.队列接口

        队列的接口比较特殊,它包含队列管理模块和消息管理模块:

            //-----------------------------------------------------------------------
            // 声明队列
            bool declareMsgQueue(
                const std::string& name,
                bool durable,
                bool exclusive,
                bool autodelete, 
                const google::protobuf::Map<std::string,std::string>& args)
            {
                // 先创建消息队列的消息存储管理
                _mmp->initQueueMesssge(name);
                // 再创建这个消息队列的管理
                return _qmp->declareMsgQueue(name,durable,exclusive,autodelete,args);
            }
            // 移除队列
            void deleteMsgQueue(const std::string& name)
            {
                // 先删除队列的消息,再删除队列的绑定,最后删除这个队列
                _mmp->destoryQueueMessage(name);
                _bmp->removeMsgQueueBindings(name);
                _qmp->deleteMsgQueue(name);
            }
            //-----------------------------------------------------------------------

        1-6.绑定接口

            //-----------------------------------------------------------------------
            // 绑定
            bool bind(const std::string& ename,const std::string& qname,const std::string& bid_ky)
            {
                // 如果 交换机 和 队列 都需要持久化,那么这条 绑定 也需要持久化
                Exchange::ptr ep = _emp->selectExchange(ename);
                if(ep.get() == nullptr)
                {
                    LOG("绑定失败,交换机 %s 不存在",ename.c_str())
                    return false;
                } 
                MsgQueue::ptr qp = _qmp->selectMsgQueue(qname);
                if(qp.get() == nullptr)
                {
                    LOG("绑定失败,队列 %s 不存在",qname.c_str())
                    return false;
                }
                return _bmp->bind(ename,qname,bid_ky,ep->durable && qp->durable);
            }
            // 解绑
            void unBind(const std::string& ename,const std::string& qname)
            {
                _bmp->unBind(ename,qname);
            }
            //-----------------------------------------------------------------------

        1-7.消息接口

 

            //-----------------------------------------------------------------------
            // 获得指定交换机的所有绑定关系
            MsgQueueBindingMap exchangeBindings(const std::string& ename)
            {
                return _bmp->getExchangeBindings(ename);
            }
            // 获得指定交换机的信息
            Exchange::ptr getOneExchange(const std::string& ename)
            {
                return _emp->selectExchange(ename);
            }
            // 获取所有队列信息
            MsgQueueMap getAllMsgQueue()
            {
                return _qmp->selectAll();
            }
            //-----------------------------------------------------------------------

        1-8.其他功能接口

        其他可能用到的接口

            //-----------------------------------------------------------------------
            // 获得指定交换机的所有绑定关系
            MsgQueueBindingMap exchangeBindings(const std::string& ename)
            {
                return _bmp->getExchangeBindings(ename);
            }
            // 获得指定交换机的信息
            Exchange::ptr getOneExchange(const std::string& ename)
            {
                return _emp->selectExchange(ename);
            }
            // 获取所有队列信息
            MsgQueueMap getAllMsgQueue()
            {
                return _qmp->selectAll();
            }
            //-----------------------------------------------------------------------
            // 判断存在
            bool exchangeExists(const std::string& ename)
            {
                return _emp->exists(ename);
            }

            bool msgqueueExists(const std::string& qname)
            {
                return _qmp->exists(qname);
            }

            bool bindingExists(const std::string& ename,const std::string& qname)
            {
                return _bmp->exists(ename,qname);
            }
            // 清理
            void clear()
            {
                _emp->clear();
                _qmp->clear();
                _bmp->clear();
                _mmp->clear();
            }
            //-----------------------------------------------------------------------

        虚拟机的实现就是如此简单,因为要用的接口都实现了,封装一下即可。

第二节:单元测试

        在mqtest目录下创建mq_virtualhost_test.cc文件,因为封装比较简单,底层的模块也已经测试过来,所以只简单测试一下即可。

        虚拟机使用独立测试套件,它的特征是对于每个单元测试,环境初始化函数和清理函数都会调用一次,保证每个单元测试的独立性。

#include "../mqserver/mq_virtualhost.hpp"

#include <gtest/gtest.h>
#include <iostream>
#include <unordered_set>

zd::VirtualHost::ptr vhp;
// 独立测试套件------------------------------------------------
// 与全局测试套件相比,可以定义成员变量,且成员变量仅相关单元测试可见
// 不需要注册测试单元
class HostTest:public testing::Test
{
public:
    // 每个单元测试之前都会调用一次
    virtual void SetUp() override
    {
        std::cout << "每个单元测试执行前的环境初始化" << std::endl;
        vhp = std::make_shared<zd::VirtualHost>("host1","./data/host1/","./data/host1/meta.bd");

        // 测试声明
        std::unordered_map<std::string,std::string> m;
        vhp->declareExchange("e1",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e2",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e3",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e4",zd::ExchangeType::DIRECT,true,false,m);
        vhp->declareExchange("e5",zd::ExchangeType::DIRECT,true,false,m);

        vhp->declareMsgQueue("q1",true,false,false,m);
        vhp->declareMsgQueue("q2",true,false,false,m);
        vhp->declareMsgQueue("q3",true,false,false,m);
        vhp->declareMsgQueue("q4",true,false,false,m); 
        vhp->declareMsgQueue("q5",true,false,false,m);

        // 测试绑定
        vhp->bind("e1","q1","");   
        vhp->bind("e2","q2","");
        vhp->bind("e3","q3",""); 
        vhp->bind("e4","q4","");
        vhp->bind("e5","q5","");
        vhp->bind("e1","q5","");
    }

    // 每个单元测试之后都会调用一次
    virtual void TearDown() override
    {
        std::cout << "每个单元测试执行后的环境清理" << std::endl;
        vhp->clear();
    }
};

// 测试声明和绑定
TEST_F(HostTest,HostTest_test1_Test)
{ 
    std::cout << "单元测试-1" << std::endl;
    ASSERT_EQ(vhp->exchangeExists("e1"),true);
    ASSERT_EQ(vhp->exchangeExists("e2"),true);
    ASSERT_EQ(vhp->exchangeExists("e3"),true);
    ASSERT_EQ(vhp->exchangeExists("e4"),true);
    ASSERT_EQ(vhp->exchangeExists("e5"),true);

    ASSERT_EQ(vhp->msgqueueExists("q1"),true);
    ASSERT_EQ(vhp->msgqueueExists("q2"),true);
    ASSERT_EQ(vhp->msgqueueExists("q3"),true);
    ASSERT_EQ(vhp->msgqueueExists("q4"),true);
    ASSERT_EQ(vhp->msgqueueExists("q5"),true);

    ASSERT_EQ(vhp->bindingExists("e1","q1"),true);
    ASSERT_EQ(vhp->bindingExists("e2","q2"),true);
    ASSERT_EQ(vhp->bindingExists("e3","q3"),true);
    ASSERT_EQ(vhp->bindingExists("e4","q4"),true);
    ASSERT_EQ(vhp->bindingExists("e5","q5"),true);
    ASSERT_EQ(vhp->bindingExists("e1","q5"),true);
}

// 测试消息的推送
TEST_F(HostTest,HostTest_test2_Test)
{
    std::cout << "单元测试-2" << std::endl;
    vhp->basicPublish("q1",nullptr,"Hello World-1");

    zd::MessagePtr msgp = vhp->basicConsume("q1");
    ASSERT_EQ(msgp->payload().body(),"Hello World-1");
    zd::MessagePtr msgp_ = vhp->basicConsume("q1");
    ASSERT_EQ(msgp_.get(),nullptr);

    zd::MessagePtr msgp__ = vhp->basicConsume("q6");

}

// 测试消息的确认
TEST_F(HostTest,HostTest_test3_Test)
{
    std::cout << "单元测试-3" << std::endl;
    zd::BasicProperties pb;
    std::string id = zd::UUIDHelper::uuid();
    pb.set_id(id);
    pb.set_delivery_mode(zd::DeliveryMode::DURABLE); 
    pb.set_routing_key("123");
    vhp->basicPublish("q1",&pb,"Hello World-1");
    vhp->basicConsume("q1");
    vhp->basicAck("q1",id);
    ASSERT_EQ(vhp->basicConsume("q1"),nullptr);    
}

// ----------------------------------------------------------
 
int main(int argc,char** argv)
{
    testing::InitGoogleTest(&argc,argv);

    if(RUN_ALL_TESTS() != 0) // 运行所有单元测试
    {
        printf("单元测试失败!\n");
    }
    return 0;
}

        编译:

mq_virhost_test:mq_virhost_test.cc ../mqcommon/mq_msg.pb.cc
	g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf -lsqlite3

        执行结果:

        符合预期 。

        那么虚拟机的"整合"也完成了。        

下期预告: 

        之后将完成与binding_key和routing_key有关的模块——Router,它提供3个功能:

        (1)routing_key的合法性检验

        (2)binding_key的合法性检验

        (3)匹配routing_key和binding_key,并返回匹配结果


网站公告

今日签到

点亮在社区的每一天
去签到