目录
交换机数据管理
- 定义交换机数据类
- 交换机名称
- 交换机类型
- 是否持久化标志
- 是否自动删除标志
- 其他参数
- 定义交换机数据持久化类(数据持久化的 sqlite3 数据库中)
- 创建/删除交换机数据表
- 新增交换机数据
- 移除交换机数据
- 查询所有交换机数据
- 查询指定交换机数据(根据名称)
- 定义交换机数据管理类
- 声明交换机,并添加管理(存在则 OK,不存在则创建)
- 删除交换机
- 获取指定交换机
- 销毁所有交换机数据
交换机数据类
struct Exchange
{
using ptr = std::shared_ptr<Exchange>;
// 交换机名称
std::string name;
// 交换机类型
ExchangeType type;
// 持久化标志
bool durable;
// 自动删除标志
bool auto_delete;
// 其他参数
std::unordered_map<std::string, std::string> args;
Exchange() {}
Exchange(const std::string &ename,
ExchangeType etype,
bool edurable,
bool eauto_delete,
const std::unordered_map<std::string, std::string> &eargs)
: name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs)
{
}
void setArgs(const std::string &str_args)
{
// key=val&key=val.....
std::vector<std::string> sub_args;
StrHelper::split(str_args, "&", sub_args);
for (auto &arg : sub_args)
{
size_t pos = arg.find("=");
std::string key = arg.substr(0, pos);
std::string val = arg.substr(pos + 1);
args.insert(std::make_pair(key, val));
}
}
std::string getArgs()
{
if (args.empty())
return "";
std::string result;
for (auto &arg : args)
{
result += arg.first + "=" + arg.second + "&";
}
result.pop_back();
return result;
}
};
需要说明的有:
- 对于其他参数,我们用一个unordered_map容器存储,与相应的字符串转化的格式是:key=val&key=val&key=val....
- 在getArgs函数中必须添加对args的判空操作,否则,args为空时,result会为空,调用pop_back函数会发生段错误。
交换机数据持久化类
class ExchangeMapper
{
public:
ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile)
{
std::string path = FileHelper::parentDirectory(dbfile);
FileHelper::createDirectory(path);
assert(_sql_helper.open());
createTable();
}
void createTable()
{
#define CREATE_TABLE "create table if not exists exchange_table(\
name varchar(32) primary key, \
type int, \
durable int, \
auto_delete int, \
args varchar(128));"
bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
if (!ret)
{
ERROR("创建交换机数据库表失败");
abort();
}
}
void removeTable()
{
#define DROP_TABLE "drop table if exists exchange_table;"
bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);
if (!ret)
{
ERROR("删除交换机数据库表失败");
abort();
}
}
void insert(Exchange::ptr &exchange)
{
std::stringstream ss;
ss << "insert into exchange_table values('"
<< exchange->name << "',"
<< exchange->type << ","
<< exchange->durable << ","
<< exchange->auto_delete << ","
<< "'" << exchange->getArgs() << "');";
bool ret = _sql_helper.exec(ss.str(), nullptr, nullptr);
if (!ret)
return;
}
void remove(const std::string &name)
{
std::stringstream ss;
ss << "delete from exchange_table where name = "
<< "'" << name << "';";
_sql_helper.exec(ss.str(), nullptr, nullptr);
}
using ExchangMap = std::unordered_map<std::string, Exchange::ptr>;
ExchangMap recovery()
{
ExchangMap result;
std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";
_sql_helper.exec(sql, selectCallback, &result);
return result;
}
private:
static int selectCallback(void *arg, int numcol, char **row, char **fields)
{
ExchangMap *result = (ExchangMap *)arg;
auto exp = std::make_shared<Exchange>();
exp->name = row[0];
exp->type = (ExchangeType)std::stoi(row[1]);
exp->durable = (bool)std::stoi(row[2]);
exp->auto_delete = (bool)std::stoi(row[3]);
if (row[4])
exp->setArgs(row[4]);
result->insert(std::make_pair(exp->name, exp));
return 0;
}
private:
SqliteHelper _sql_helper;
};
说明:
- 在构造对象时就把相应的数据库打开,如果没有就创建,但要注意一定要先创建好数据库所在目录,在创建表(如果不存在)。
- 前四个函数就是对sql语言的一个使用。
- recovery方法中我们将查询出来的结果以交换机名称为键值,存储在一个unordered_map容器中,以拿到所有交换机。
交换机数据管理类
class ExchangeManager
{
public:
using ptr = std::shared_ptr<ExchangeManager>;
ExchangeManager(const std::string &dbfile) : _mapper(dbfile)
{
_exchanges = _mapper.recovery();
}
// 声明交换机
void declareExchange(const std::string &name,
ExchangeType type,
bool durable,
bool auto_delete,
std::unordered_map<std::string, std::string> &args)
{
std::unique_lock<std::mutex> lock(_mutex);
// 先判断是否存在
auto it = _exchanges.find(name);
if (it != _exchanges.end())
return;
// 新增交换机
auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
_exchanges.insert(std::make_pair(name, exp));
if (durable)
_mapper.insert(exp);
}
// 删除交换机
void deleteExchange(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
// 先判断是否存在
auto it = _exchanges.find(name);
if (it == _exchanges.end())
return;
if (_exchanges[name]->durable)
_mapper.remove(name);
_exchanges.erase(name);
}
// 获取指定交换机
Exchange::ptr selectExchange(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
// 先判断是否存在
auto it = _exchanges.find(name);
if (it == _exchanges.end())
return nullptr;
return it->second;
}
// 判断交换机是否存在
bool exists(const std::string &name)
{
std::unique_lock<std::mutex> lock(_mutex);
// 先判断是否存在
auto it = _exchanges.find(name);
if (it == _exchanges.end())
return false;
return true;
}
size_t size()
{
std::unique_lock<std::mutex> lock(_mutex);
return _exchanges.size();
}
// 清理所有交换机数据
void clear()
{
std::unique_lock<std::mutex> lock(_mutex);
_mapper.removeTable();
_exchanges.clear();
}
private:
std::mutex _mutex;
ExchangeMapper _mapper;
std::unordered_map<std::string, Exchange::ptr> _exchanges;
};
说明:
- 此类是真正要让用户使用的类,包含有几个成员属性:
- 互斥锁:支持多线程同时访问。
- ExchangeMapper对象:对数据库进行操作,获取所有交换机。
- _exchanges:存储所有交换机,方便用户获取。
- 在构造时就调用_mapper的recovery方法拿到所有交换机。
- 所有方法对于判断交换机是否存在的操作必须独自进行,不能复用exists方法,否则会导致死锁。
测试
#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>
jiuqi::ExchangeManager::ptr emp;
class ExchangeTest : public testing::Environment
{
public:
virtual void SetUp() override
{
emp = std::make_shared<jiuqi::ExchangeManager>("./data/meta.db");
}
virtual void TearDown() override
{
// emp->clear();
}
};
TEST(ExchangeTest, insert_test)
{
std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};
std::unordered_map<std::string, std::string> map_empty;
emp->declareExchange("exchange1", jiuqi::ExchangeType::DIRECT, true, false, map);
emp->declareExchange("exchange2", jiuqi::ExchangeType::TOPIC, true, false, map);
emp->declareExchange("exchange3", jiuqi::ExchangeType::FANOUT, true, false, map);
emp->declareExchange("exchange4", jiuqi::ExchangeType::FANOUT, true, false, map_empty);
emp->declareExchange("exchange5", jiuqi::ExchangeType::FANOUT, true, false, map_empty);
emp->declareExchange("exchange6", jiuqi::ExchangeType::FANOUT, true, false, map_empty);
ASSERT_EQ(emp->size(), 6);
}
TEST(ExchangeTest, select_test)
{
jiuqi::Exchange::ptr exp = emp->selectExchange("exchange3");
ASSERT_EQ(exp->name, "exchange3");
ASSERT_EQ(exp->type, jiuqi::ExchangeType::FANOUT);
ASSERT_EQ(exp->durable, true);
ASSERT_EQ(exp->auto_delete, false);
ASSERT_EQ(exp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}
TEST(ExchangeTest, delete_test)
{
emp->deleteExchange("exchange1");
jiuqi::Exchange::ptr exp = emp->selectExchange("exchange1");
ASSERT_EQ(exp.get(), nullptr);
ASSERT_EQ(emp->exists("exchange1"), false);
}
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);
testing::AddGlobalTestEnvironment(new ExchangeTest);
return RUN_ALL_TESTS();
}
select测试可能会失败,原因是在交换机中我们使用unordered_map存储其他参数,由于顺序随机,所有使用getArgs获取的字符串中键值对的顺序也是随机的,但是不影响功能。