交换机数据管理

发布于:2025-09-14 ⋅ 阅读:(22) ⋅ 点赞:(0)

交换机数据管理

代码如下:

#ifndef __M_EXCHANGE_H__
#define __M_EXCHANGE_H__
#include <string>
#include <cassert>
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_msg.pb.h"


namespace xypmq
{
    //1. 定义交换机类
    struct Exchange
    {
        using ptr = std::shared_ptr<Exchange>;
        //1. 交换机名称
        std::string name;
        //2. 交换机类型
        ExchangeType type;
        //3. 交换机持久化标志
        bool durable;
        //4. 是否自动删除标志
        bool auto_delete;
        //5. 其他参数
        google::protobuf::Map<std::string, std::string> args;

        Exchange(){}
        Exchange(const std::string &ename,
                 ExchangeType etype,
                 bool edurable,
                 bool eauto_delete,
                 const google::protobuf::Map<std::string, std::string> &eargs)
            : name(ename), type(etype), durable(edurable), 
              auto_delete(eauto_delete), args(eargs) {}

        void setArgs(const std::string& str_args)
        {
            std::vector<std::string> sub_args;
            StrHelper::split(str_args,"&",sub_args);
            for(auto&str:sub_args)
            {
                size_t pos=str.find("=");
                std::string key=str.substr(0,pos);
                std::string val=str.substr(pos+1);
                args[key] = val;
            }
        }
        std::string getArgs()
        {
            std::string result;
            for(auto start=args.begin();start!=args.end();++start)
            {
                result+=start->first+"="+start->second+"&";
            }
            return result;
        }
    };
    using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
    //2. 定义交换机数据持久化管理类--数据存储在sqlite数据库中
    class ExchangeMapper
    {
    public:
        ExchangeMapper(const std::string &dbfile):_sql_helper(dbfile)
        {
            std::string path=FileHelper::parentDirectory(dbfile);
            FileHelper::createDirectory(path);
            assert(_sql_helper.open());
            createTable();
        }
        void createTable()
        {
            #define CREATE_TABLE "create table if not exists exchange_table(\
            name varchar(32) primary key,\
            type int,\
            durable int,\
            auto_delete int,\
            args varchar(128));"
            bool ret=_sql_helper.exec(CREATE_TABLE,nullptr,nullptr);
            if(ret==false)
            {
                DLOG("创建交换机数据库失败!");
                abort();//直接异常退出程序
            }
        }
        void removeTable()
        {
            #define DROP_TABLE "drop table if exists exchange_table;"
            bool ret=_sql_helper.exec(DROP_TABLE,nullptr,nullptr);
            if(ret==false)
            {
                DLOG("创建交换机数据库失败!");
                abort();//直接异常退出程序
            }
        }
        bool insert(Exchange::ptr &exp)
        {
            std::stringstream ss;
            ss << "insert into exchange_table values(";
            ss << "'" << exp->name << "', ";
            ss << exp->type << ", ";
            ss << exp->durable << ", ";
            ss << exp->auto_delete << ", ";
            ss << "'" << exp->getArgs() << "');";
            return _sql_helper.exec(ss.str(), nullptr, nullptr);
        }
        void remove(const std::string &name)
        {
            std::stringstream ss;
            ss << "delete from exchange_table where name=";
            ss << "'" << name << "';";
            _sql_helper.exec(ss.str(), nullptr, nullptr);
        }
        ExchangeMap recovery() 
        {
            ExchangeMap result;
            std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
            _sql_helper.exec(sql, selectCallback, &result);
            return result;
        }
     private:
            static int selectCallback(void* arg,int numcol,char** row,char** fields) 
            {
                ExchangeMap *result = (ExchangeMap*)arg;
                auto exp = std::make_shared<Exchange>();
                exp->name = row[0];
                exp->type = (xypmq::ExchangeType)std::stoi(row[1]);
                exp->durable = (bool)std::stoi(row[2]);
                exp->auto_delete = (bool)std::stoi(row[3]);
                if (row[4]) exp->setArgs(row[4]);
                result->insert(std::make_pair(exp->name, exp));
                return 0;
            }
    private:
        SqliteHelper _sql_helper;
    };

    class ExchangeManager
    {   
    public:
        using ptr = std::shared_ptr<ExchangeManager>;
        ExchangeManager(const std::string &dbfile):_mapper(dbfile)
        {
            _exchanges=_mapper.recovery();
        }
        bool declareExchange(const std::string& name,
                            ExchangeType type,
                            bool durable,
                            bool auto_delete,
                            const google::protobuf::Map<std::string, std::string>& args)
                            {
                                std::unique_lock<std::mutex> _lock(_mutex);
                                auto it=_exchanges.find(name);
                                if(it!=_exchanges.end())
                                {
                                    return true;
                                }
                                auto exp=std::make_shared<Exchange>(name,type,durable,auto_delete,args);
                                if(durable==true)
                                {
                                   bool ret = _mapper.insert(exp);
                                   if (ret == false) return false;
                                }
                                _exchanges.insert(std::make_pair(name,exp));
                                return true;
                            }
        void deleteExchange(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex) ;
            auto it=_exchanges.find(name);
            if(it==_exchanges.end())
            {
                return;
            }
            if(it->second->durable==true) _mapper.remove(name);
            _exchanges.erase(name);
        }
        Exchange::ptr selectExchange(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex) ;
            auto it=_exchanges.find(name);
            if(it==_exchanges.end())
            {
                return Exchange::ptr();
            }
            return it->second;
        }
        bool exists(const std::string &name)
        {
            std::unique_lock<std::mutex> lock(_mutex) ;
            auto it=_exchanges.find(name);
            if(it==_exchanges.end())
            {
                return false;
            }
            return true;
        }
        size_t size()
        {
            std::unique_lock<std::mutex> lock(_mutex) ;
            return _exchanges.size();
        }
        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex) ;
            _mapper.removeTable();
            _exchanges.clear();
        }
    private:
        std::mutex _mutex;
        ExchangeMapper _mapper;
        ExchangeMap _exchanges;
    };
} 
#endif