目录
客户端操作编写(mysqlMessageTable.hpp)
服务端创建子类(MessageStoreServiceImpl)完成RPC服务调用函数重写
GetTimeRangeMessage(获取指定时间内的消息)
服务端完成消息转发子服务类(MessageStoreServer)
本章节,主要对项目中消息存储子服务模块进行分析、开发与测试。
功能设计
消息存储子服务,主要用于管理消息的存储,其中:
文本消息:存储在ES文档搜索服务器中。
文件/语音/图片:存储到文件管理子服务中。
而消息存储,是服务器内部做的事,不需要和外部用户进行交互,因此不做功能性接口提供,只在内部实现。
除了对消息的存储管理,还需要管理消息的搜索获取,需要提供3个功能性接口:
1、获取最近N条消息:用户登陆成功后,点开会话框,显示最近的消息。
2、获取指定时间段内的消息:用户可以进行聊天消息的按时间搜索。
3、关键字消息搜索:用户可以在会话框中进行聊天消息的关键字搜索。
模块划分
参数/配置文件解析模块 | 基于gflags框架直接使用,进行参数/配置文件的解析。 |
日志模块 | 基于spdlog封装的logger 直接进行日志输出。 |
服务注册模块 | 基于etcd框架封装的注册模块 直接进行消息存储子服务模块的服务注册。 |
RPC服务模块 | 基于brpc框架 搭建消息存储子服务的RPC服务器。 |
服务发现与调用模块 | 基于etcd框架封装的服务发现与brpc框架封装的服务调用模块。 1、发现文件管理子服务,将文件/语音/图片类型的消息以及用户头像之类的文件,存储到文件管理子服务。 2、发现用户管理子服务,在搜索消息的时候,根据发送者的用户ID获取发送者用户信息进行显示。 |
数据库数据操作模块 | 基于odb-mysql数据管理封装的模块,实现关系型数据库中数据的操作。 1、用于从RabbitMQ中消费到消息后,将消息向数据库中存储一份,便于通过时间搜索获取消息。 |
ES客户端模块 | 基于ES框架封装的信息存储功能,向ES服务器进行文本聊天消息的存储,以便于文本消息的关键字搜索。 |
RabbitMQ消费模块 | 基于rabbitmq-client封装的消费模块,从消息队列服务器消费获取到聊天消息,将文本消息存储到ES,文件消息存储到文件管理子服务。 |
业务接口/功能示意图
消息存储
获取最近N条消息
获取指定时间段的消息
消息关键字搜索
服务实现流程
数据管理
MySQL(消息信息管理)
在消息存储子服务中,所有的消息的简息都要在MySQL中存储一份,进行消息的持久化。
以便于进行时间范围性查询和离线消息的实现。
消息类型有四种:文本、文件、语音、图片。
不可能把文件数据存到数据库中,因此数据库中只存储文本消息以及其它类型消息的元信息。
消息表结构:
消息ID | 消息的唯一性标识。 |
消息产生时间 | 用于进行时间性查询。 |
消息发送者用户ID | 明确消息发送者。 |
消息产生的聊天会话ID | 明确消息属于哪个会话。 |
消息类型 | 明确消息的类型。 |
消息内容 | 只存储文本消息,文件类型消息存储在文件管理子服务。 |
文件ID | 只有文件消息才有。 |
文件大小 | 只有文件消息才有。 |
文件名称 | 只有文件消息才有。 |
对应提供的操作:
1、新增消息。
2、根据聊天会话ID, 删除该会话中的所有消息信息。
3、通过会话ID、消息数量,获取最近N条消息(逆序 + limitN即可)。
4、通过会话ID、时间范围,获取指定时间段内的消息,并按时间进行排序。
ES(文本消息管理)
为了实现消息的关键字搜索功能,将消息内容存入ES中。
如果在数据库中进行关键字的模糊匹配,效率非常低下,因此选用ES。
但是在搜索的时候,需要进行聊天会话的过滤,因此这里也要考虑ES索引的构造。
消息文档索引(Index)
POST /message/_doc
{
"settings": {
"analysis": {
"analyzer": {
"ik": {
"tokenizer": "ik_max_word"
}
}
}
},
"mappings": {
"dynamic": true,
"properties": {
"chat_session_id": {
"type": "keyword",
"analyzer": "standard"
},
"message_id": {
"type": "keyword",
"analyzer": "standard"
},
"content": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
其中关键类型就有:chat_session_id、type、message_id、content。
新增测试数据
POST /message/_doc/_bulk
{"index":{"_id":"1"}}
{"chat_session_id":"会话ID1","message_id":"消息ID1","content":"吃饭了么?"}
{"index":{"_id":"2"}}
{"chat_session_id":"会话ID1","message_id":"消息ID2","content":"吃的盖浇饭。"}
{"index":{"_id":"3"}}
{"chat_session_id":"会话ID2","message_id":"消息ID3","content":"昨天吃饭了么?"}
{"index":{"_id":"4"}}
{"chat_session_id":"会话ID2","message_id":"消息ID4","content":"昨天吃的盖浇饭。"}
查看数据
GET /message/_doc/_search?pretty
{
"query": {
"match_all": {}
}
}
查看结果:
成功。
删除该索引
DELETE /message
总体流程
1、编写服务所需的proto文件,利用protoc工具生成RPC服务器所需的.pb.h 和 .pb.cc 项目文件。 |
2、服务端 创建子类,继承于proto文件中RPC调用类,并进行功能性接口函数重写。 |
3、服务端 完成消息存储子服务类。 |
4、实例化 服务类对象,启动服务 |
服务代码实现
数据管理
MySQL(消息信息管理)
Message(ODB文件编写)
想要实现MySQL对消息信息的管理,那么首先需要通过ODB编程,构造一个Message表。
message.hxx:
#pragma once
#include <iostream>
#include <odb/nullable.hxx>
#include <odb/core.hxx>
#include <boost/date_time/posix_time/posix_time.hpp>
// odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time message.hxx
namespace yangz
{
#pragma db object table("Message")
class Message
{
public:
Message() {}
Message(const std::string &message_id,
const boost::posix_time::ptime &create_time,
const std::string &user_id,
const std::string &chat_session_id,
const unsigned char &message_type)
: _message_id(message_id), _create_time(create_time), _user_id(user_id), _chat_session_id(chat_session_id), _message_type(message_type)
{
}
~Message() {}
public:
void set_message_id(const std::string &message_id) { _message_id = message_id; }
std::string get_message_id() { return _message_id; }
void set_create_time(const boost::posix_time::ptime &create_time) { _create_time = create_time; }
boost::posix_time::ptime get_create_time() { return _create_time; }
void set_user_id(const std::string &user_id) { _user_id = user_id; }
std::string get_user_id() { return _user_id; }
void set_chat_session_id(const std::string &chat_session_id) { _chat_session_id = chat_session_id; }
std::string get_chat_session_id() { return _chat_session_id; }
void set_message_type(unsigned char message_type) { _message_type = message_type; }
unsigned char get_message_type() { return _message_type; }
void set_message_content(const std::string &message_content) { _message_content = message_content; }
std::string get_message_content()
{
if (_message_content)
return *_message_content;
return std::string();
}
void set_file_id(const std::string &file_id) { _file_id = file_id; }
std::string get_file_id()
{
if (_file_id)
return *_file_id;
return std::string();
}
void set_file_size(unsigned int file_size) { _file_size = file_size; }
unsigned int get_file_size()
{
if (_file_size)
return *_file_size;
return 0;
}
void set_file_name(const std::string &file_name) { _file_name = file_name; }
std::string get_file_name()
{
if (_file_name)
return *_file_name;
return std::string();
}
private:
friend class odb::access;
#pragma db id auto
unsigned long _id; // 自增主键
#pragma db type("varchar(64)") index unique
std::string _message_id; // 消息ID, varchar(64), 被索引, 唯一性约束
#pragma db type("TIMESTAMP")
boost::posix_time::ptime _create_time; // 消息产生的时间, timestamp
#pragma db type("varchar(64)")
std::string _user_id; // 消息发送者的用户ID, varchar(64)
#pragma db type("varchar(64)")
std::string _chat_session_id; // 消息所属会话ID, varchar(64)
unsigned char _message_type; // 消息类型: 0-文本 1-图片 2-文件 3-语音
odb::nullable<std::string> _message_content; // 消息内容(文本消息)
#pragma db type("varchar(64)")
odb::nullable<std::string> _file_id; // 文件ID(文件消息)
odb::nullable<unsigned int> _file_size; // 文件大小(文件消息)
#pragma db type("varchar(128)")
odb::nullable<std::string> _file_name; // 文件名(文件消息)
};
}
编译生成sql文件指令:
odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time message.hxx
此时在 .sql文件里新增:
然后将该.sql 文件导入数据库中:
mysql -uroot -p 'MicroChat' < message.sql
Enter password:
现在在数据库中就有对应的表了:
客户端操作编写(mysqlMessageTable.hpp)
对应提供的操作:
1、新增消息。
2、根据聊天会话ID, 删除该会话中的所有消息信息。
3、通过会话ID、消息数量,获取最近N条消息(逆序 + limitN即可),并按时间进行排序。
4、通过会话ID、时间范围,获取指定时间段内的消息,并按时间进行排序。
#pragma once
#include "odbMysqlHandleFactory.hpp"
#include "message.hxx"
#include "message-odb.hxx"
#include "logger.hpp"
namespace yangz
{
class MessageTableClient
{
public:
using ptr = std::shared_ptr<MessageTableClient>;
MessageTableClient(const std::shared_ptr<odb::core::database> &mysql_client) : _mysql_client(mysql_client) {}
public:
// 新增消息信息
bool insert(Message &message)
{
try
{
odb::transaction trans(_mysql_client->begin());
_mysql_client->persist(message);
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("新增消息信息失败, 消息ID: {}, 失败原因: {}", message.get_message_id(), e.what());
return false;
}
return true;
}
// 根据聊天会话ID,删除会话中的信息
bool remove(const std::string &chat_session_id)
{
try
{
odb::transaction trans(_mysql_client->begin());
typedef odb::query<Message> query;
typedef odb::result<Message> result;
_mysql_client->erase_query<Message>(query::chat_session_id == chat_session_id);
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("根据聊天会话ID删除聊天会话消息信息失败, chat_session_id: {}, 失败原因: {}", chat_session_id, e.what());
return false;
}
return true;
}
// 通过会话ID、消息数量,获取最近N条消息(逆序 + limitN即可), 并按时间进行排序
std::vector<Message> get_recent_N_message(const std::string &cssid, int N)
{
std::vector<Message> messages;
try
{
odb::transaction trans(_mysql_client->begin());
typedef odb::query<Message> query;
typedef odb::result<Message> result;
// select * from message where [chat_session_id = cssid order by create_time DESC limit N]
std::stringstream condition;
condition << "chat_session_id = '" << cssid << "' ";
condition << "order by create_time DESC limit " << N;
result r(_mysql_client->query<Message>(condition.str()));
for (result::iterator i(r.begin()); i != r.end(); ++i)
messages.push_back(*i);
std::reverse(messages.begin(), messages.end()); // 按照时间的先后顺序调整
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("获取最近N条消息失败, chat_session_id: {}, 失败原因: {}", cssid, e.what());
}
return messages;
}
// 通过会话ID、时间范围,获取指定时间段内的消息,并按时间进行排序
std::vector<Message> get_time_range_message(const std::string &cssid,
boost::posix_time::ptime &start_time,
boost::posix_time::ptime &end_time)
{
std::vector<Message> messages;
try
{
odb::transaction trans(_mysql_client->begin());
typedef odb::query<Message> query;
typedef odb::result<Message> result;
result r(_mysql_client->query<Message>(query::chat_session_id == cssid &&
query::create_time >= start_time &&
query::create_time <= end_time));
for (result::iterator i(r.begin()); i != r.end(); ++i)
messages.push_back(*i);
std::reverse(messages.begin(), messages.end()); // 按照时间的先后顺序调整
trans.commit();
}
catch (const std::exception &e)
{
LOG_ERROR("获取时间段内消息失败, chat_session_id: {}, start_time: {}, end_time: {} 失败原因: {}",
cssid, boost::posix_time::to_simple_string(start_time), boost::posix_time::to_simple_string(end_time), e.what());
}
return messages;
}
private:
std::shared_ptr<odb::core::database> _mysql_client;
};
}
ES(文本消息管理)
提供以下几个操作:
1、创建索引库。
2、添加文本新数据。
3、通过消息ID,移除消息信息。
4、通过关键字,查找/筛选数据。
elasticsearchDataManage.hpp:
class ESMessageInfoManage
{
public:
using ptr = std::shared_ptr<ESMessageInfoManage>;
ESMessageInfoManage(const std::shared_ptr<elasticlient::Client> &es_client) : _es_client(es_client) {}
~ESMessageInfoManage() {}
public:
// 创建消息信息索引
bool createIndex()
{
bool res = ESIndexCreate(_es_client, "message")
.append("user_id", "keyword", "standard", false)
.append("message_id", "keyword", "standard", false)
.append("create_time", "long", "standard", false)
.append("chat_session_id", "keyword", "standard", true)
.append("message_content")
.create();
if (res == false)
{
LOG_INFO("消息信息索引创建失败");
return false;
}
return true;
}
// 新增消息数据
bool appendData(const std::string &user_id,
const std::string &message_id,
const long create_time,
const std::string &chat_session_id,
const std::string &message_content)
{
bool res = ESDataInsert(_es_client, "message")
.append("user_id", user_id)
.append("message_id", message_id)
.append("create_time", create_time)
.append("chat_session_id", chat_session_id)
.append("message_content", message_content)
.insert(message_id);
if (res == false)
{
LOG_ERROR("消息数据插入失败");
return false;
}
return true;
}
// 通过消息id 移除该消息
bool remove(const std::string &message_ud)
{
bool ret = ESDataRemove(_es_client, "message").remove(message_ud);
if (ret == false)
{
LOG_ERROR("消息数据删除失败!");
return false;
}
LOG_INFO("消息数据删除成功!");
return true;
}
// 关键字查询消息 - 关键字 + 会话ID
std::vector<yangz::Message> search(const std::string &key, const std::string &cssid)
{
std::vector<yangz::Message> messages;
Json::Value json_message = ESDataSearch(_es_client, "message")
.append_must_term("chat_session_id.keyword", cssid)
.append_must_match("message_content", key)
.search();
if (json_message.isArray() == false)
{
LOG_ERROR("消息搜索结果为空,或者结果不是数组类型");
return messages;
}
int sz = json_message.size();
for (int i = 0; i < sz; i++)
{
yangz::Message message;
message.set_user_id(json_message[i]["_source"]["user_id"].asString());
message.set_message_id(json_message[i]["_source"]["message_id"].asString());
boost::posix_time::ptime ctime(boost::posix_time::from_time_t(
json_message[i]["_source"]["create_time"].asInt64()));
message.set_create_time(ctime);
message.set_chat_session_id(json_message[i]["_source"]["chat_session_id"].asString());
message.set_message_content(json_message[i]["_source"]["message_content"].asString());
messages.push_back(message);
}
return messages;
}
private:
std::shared_ptr<elasticlient::Client> _es_client;
};
编写proto文件
消息存储proto
既然消息模块有3个功能性接口,那么就有3个对应的请求与响应结构,以及最终的PRC调用(messageStore.proto)。
其中消息存储是内部接口,不做功能性接口显示。