C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(二)

发布于:2025-07-22 ⋅ 阅读:(19) ⋅ 点赞:(0)

目录

交换机数据管理

交换机数据类

交换机数据持久化类

交换机数据管理类

测试

 


交换机数据管理

  • 定义交换机数据类
  1. 交换机名称
  2. 交换机类型
  3. 是否持久化标志
  4. 是否自动删除标志
  5. 其他参数
  •  定义交换机数据持久化类(数据持久化的 sqlite3 数据库中)
  1. 创建/删除交换机数据表
  2. 新增交换机数据
  3. 移除交换机数据
  4. 查询所有交换机数据
  5. 查询指定交换机数据(根据名称)
  • 定义交换机数据管理类
  1. 声明交换机,并添加管理(存在则 OK,不存在则创建)
  2. 删除交换机
  3. 获取指定交换机
  4. 销毁所有交换机数据

交换机数据类

    struct Exchange
    {
        using ptr = std::shared_ptr<Exchange>;
        // 交换机名称
        std::string name;
        // 交换机类型
        ExchangeType type;
        // 持久化标志
        bool durable;
        // 自动删除标志
        bool auto_delete;
        // 其他参数
        std::unordered_map<std::string, std::string> args;

        Exchange() {}
        Exchange(const std::string &ename,
                 ExchangeType etype,
                 bool edurable,
                 bool eauto_delete,
                 const std::unordered_map<std::string, std::string> &eargs)
            : name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs)
        {
        }

        void setArgs(const std::string &str_args)
        {
            // key=val&key=val.....
            std::vector<std::string> sub_args;
            StrHelper::split(str_args, "&", sub_args);
            for (auto &arg : sub_args)
            {
                size_t pos = arg.find("=");
                std::string key = arg.substr(0, pos);
                std::string val = arg.substr(pos + 1);
                args.insert(std::make_pair(key, val));
            }
        }
        std::string getArgs()
        {
            if (args.empty())
                return "";

            std::string result;
            for (auto &arg : args)
            {
                result += arg.first + "=" + arg.second + "&";
            }
            result.pop_back();
            return result;
        }
    };

需要说明的有:

  • 对于其他参数,我们用一个unordered_map容器存储,与相应的字符串转化的格式是:key=val&key=val&key=val....
  • 在getArgs函数中必须添加对args的判空操作,否则,args为空时,result会为空,调用pop_back函数会发生段错误。

交换机数据持久化类

    class ExchangeMapper
    {
    public:
        ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile)
        {
            std::string path = FileHelper::parentDirectory(dbfile);
            FileHelper::createDirectory(path);
            assert(_sql_helper.open());
            createTable();
        }
        void createTable()
        {
#define CREATE_TABLE "create table if not exists exchange_table(\
                              name varchar(32) primary key, \
                              type int, \
                              durable int, \
                              auto_delete int, \
                              args varchar(128));"
            bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
            if (!ret)
            {
                ERROR("创建交换机数据库表失败");
                abort();
            }
        }
        void removeTable()
        {
#define DROP_TABLE "drop table if exists exchange_table;"
            bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);
            if (!ret)
            {
                ERROR("删除交换机数据库表失败");
                abort();
            }
        }
        void insert(Exchange::ptr &exchange)
        {
            std::stringstream ss;
            ss << "insert into exchange_table values('"
               << exchange->name << "',"
               << exchange->type << ","
               << exchange->durable << ","
               << exchange->auto_delete << ","
               << "'" << exchange->getArgs() << "');";
            bool ret = _sql_helper.exec(ss.str(), nullptr, nullptr);
            if (!ret)
                return;
        }
        void remove(const std::string &name)
        {
            std::stringstream ss;
            ss << "delete from exchange_table where name = "
               << "'" << name << "';";
            _sql_helper.exec(ss.str(), nullptr, nullptr);
        }
        using ExchangMap = std::unordered_map<std::string, Exchange::ptr>;
        ExchangMap recovery()
        {
            ExchangMap result;
            std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
            _sql_helper.exec(sql, selectCallback, &result);
            return result;
        }

    private:
        static int selectCallback(void *arg, int numcol, char **row, char **fields)
        {
            ExchangMap *result = (ExchangMap *)arg;
            auto exp = std::make_shared<Exchange>();
            exp->name = row[0];
            exp->type = (ExchangeType)std::stoi(row[1]);
            exp->durable = (bool)std::stoi(row[2]);
            exp->auto_delete = (bool)std::stoi(row[3]);
            if (row[4])
                exp->setArgs(row[4]);
            result->insert(std::make_pair(exp->name, exp));
            return 0;
        }

    private:
        SqliteHelper _sql_helper;
    };

说明:

  • 在构造对象时就把相应的数据库打开,如果没有就创建,但要注意一定要先创建好数据库所在目录,在创建表(如果不存在)。
  • 前四个函数就是对sql语言的一个使用。
  • recovery方法中我们将查询出来的结果以交换机名称为键值,存储在一个unordered_map容器中,以拿到所有交换机。

交换机数据管理类

    class ExchangeManager
    {
    public:
        using ptr = std::shared_ptr<ExchangeManager>;
        ExchangeManager(const std::string &dbfile) : _mapper(dbfile)
        {
            _exchanges = _mapper.recovery();
        }
        // 声明交换机
        void declareExchange(const std::string &name,
                             ExchangeType type,
                             bool durable,
                             bool auto_delete,
                             std::unordered_map<std::string, std::string> &args)
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // 先判断是否存在
            auto it = _exchanges.find(name);
            if (it != _exchanges.end())
                return;

            // 新增交换机
            auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
            _exchanges.insert(std::make_pair(name, exp));
            if (durable)
                _mapper.insert(exp);
        }

        // 删除交换机
        void deleteExchange(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // 先判断是否存在
            auto it = _exchanges.find(name);
            if (it == _exchanges.end())
                return;

            if (_exchanges[name]->durable)
                _mapper.remove(name);
            _exchanges.erase(name);
        }

        // 获取指定交换机
        Exchange::ptr selectExchange(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // 先判断是否存在
            auto it = _exchanges.find(name);
            if (it == _exchanges.end())
                return nullptr;

            return it->second;
        }

        // 判断交换机是否存在
        bool exists(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            // 先判断是否存在
            auto it = _exchanges.find(name);
            if (it == _exchanges.end())
                return false;
            return true;
        }

        size_t size()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _exchanges.size();
        }

        // 清理所有交换机数据
        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _mapper.removeTable();
            _exchanges.clear();
        }

    private:
        std::mutex _mutex;
        ExchangeMapper _mapper;
        std::unordered_map<std::string, Exchange::ptr> _exchanges;
    };

说明:

  • 此类是真正要让用户使用的类,包含有几个成员属性:
    • 互斥锁:支持多线程同时访问。
    • ExchangeMapper对象:对数据库进行操作,获取所有交换机。
    • _exchanges:存储所有交换机,方便用户获取。
  • 在构造时就调用_mapper的recovery方法拿到所有交换机。
  • 所有方法对于判断交换机是否存在的操作必须独自进行,不能复用exists方法,否则会导致死锁。

测试

 

#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>

jiuqi::ExchangeManager::ptr emp;

class ExchangeTest : public testing::Environment
{
public:
    virtual void SetUp() override
    {
        emp = std::make_shared<jiuqi::ExchangeManager>("./data/meta.db");
    }

    virtual void TearDown() override
    {
        // emp->clear();
    }
};

TEST(ExchangeTest, insert_test)
{
    std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};
    std::unordered_map<std::string, std::string> map_empty;
    emp->declareExchange("exchange1", jiuqi::ExchangeType::DIRECT, true, false, map);
    emp->declareExchange("exchange2", jiuqi::ExchangeType::TOPIC, true, false, map);
    emp->declareExchange("exchange3", jiuqi::ExchangeType::FANOUT, true, false, map);
    emp->declareExchange("exchange4", jiuqi::ExchangeType::FANOUT, true, false, map_empty);
    emp->declareExchange("exchange5", jiuqi::ExchangeType::FANOUT, true, false, map_empty);
    emp->declareExchange("exchange6", jiuqi::ExchangeType::FANOUT, true, false, map_empty);
    ASSERT_EQ(emp->size(), 6);
}

TEST(ExchangeTest, select_test)
{
    jiuqi::Exchange::ptr exp = emp->selectExchange("exchange3");
    ASSERT_EQ(exp->name, "exchange3");
    ASSERT_EQ(exp->type, jiuqi::ExchangeType::FANOUT);
    ASSERT_EQ(exp->durable, true);
    ASSERT_EQ(exp->auto_delete, false);
    ASSERT_EQ(exp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}

TEST(ExchangeTest, delete_test)
{
    emp->deleteExchange("exchange1");
    jiuqi::Exchange::ptr exp = emp->selectExchange("exchange1");
    ASSERT_EQ(exp.get(), nullptr);
    ASSERT_EQ(emp->exists("exchange1"), false);
}

int main(int argc, char *argv[])
{
    testing::InitGoogleTest(&argc, argv);
    testing::AddGlobalTestEnvironment(new ExchangeTest);
    return RUN_ALL_TESTS();
}

        select测试可能会失败,原因是在交换机中我们使用unordered_map存储其他参数,由于顺序随机,所有使用getArgs获取的字符串中键值对的顺序也是随机的,但是不影响功能。


网站公告

今日签到

点亮在社区的每一天
去签到