微服务即时通信系统(十)---消息存储子服务

发布于:2025-03-28 ⋅ 阅读:(23) ⋅ 点赞:(0)

目录

功能设计

模块划分

业务接口/功能示意图

消息存储

获取最近N条消息

获取指定时间段的消息

消息关键字搜索

服务实现流程

数据管理

MySQL(消息信息管理)

ES(文本消息管理)

消息文档索引(Index)

新增测试数据

查看数据

删除该索引

总体流程

服务代码实现

数据管理

MySQL(消息信息管理)

Message(ODB文件编写) 

客户端操作编写(mysqlMessageTable.hpp)

ES(文本消息管理)

编写proto文件

消息存储proto

获取最近的N条消息

获取指定时间内的消息

关键字搜索消息

RPC调用

服务端创建子类(MessageStoreServiceImpl)完成RPC服务调用函数重写

MessageStore(消息存储,内部接口)

GetRecentNMessage(获取最近N条消息)

GetTimeRangeMessage(获取指定时间内的消息)

GetMessageByKeyword(关键字搜索消息)

RPC服务端代码(总)

服务端完成消息转发子服务类(MessageStoreServer)

注意

实例化服务类对象,启动服务

工程系统构建配置文件(CMakeLists.txt)

服务测试

ES测试

MySQL测试

客户端测试


本章节,主要对项目中消息存储子服务模块进行分析、开发与测试。

功能设计

消息存储子服务,主要用于管理消息的存储,其中:

文本消息:存储在ES文档搜索服务器中。

文件/语音/图片:存储到文件管理子服务中。

而消息存储,是服务器内部做的事,不需要和外部用户进行交互,因此不做功能性接口提供,只在内部实现。 

除了对消息的存储管理,还需要管理消息的搜索获取,需要提供3个功能性接口:

1、获取最近N条消息:用户登陆成功后,点开会话框,显示最近的消息。

2、获取指定时间段内的消息:用户可以进行聊天消息的按时间搜索。

3、关键字消息搜索:用户可以在会话框中进行聊天消息的关键字搜索。

模块划分

参数/配置文件解析模块 基于gflags框架直接使用,进行参数/配置文件的解析。
日志模块 基于spdlog封装的logger 直接进行日志输出。
服务注册模块 基于etcd框架封装的注册模块 直接进行消息存储子服务模块的服务注册。
RPC服务模块 基于brpc框架 搭建消息存储子服务的RPC服务器。
服务发现与调用模块

基于etcd框架封装的服务发现与brpc框架封装的服务调用模块。

1、发现文件管理子服务,将文件/语音/图片类型的消息以及用户头像之类的文件,存储到文件管理子服务。

2、发现用户管理子服务,在搜索消息的时候,根据发送者的用户ID获取发送者用户信息进行显示。

数据库数据操作模块

基于odb-mysql数据管理封装的模块,实现关系型数据库中数据的操作。

1、用于从RabbitMQ中消费到消息后,将消息向数据库中存储一份,便于通过时间搜索获取消息。

ES客户端模块 基于ES框架封装的信息存储功能,向ES服务器进行文本聊天消息的存储,以便于文本消息的关键字搜索。
RabbitMQ消费模块 基于rabbitmq-client封装的消费模块,从消息队列服务器消费获取到聊天消息,将文本消息存储到ES,文件消息存储到文件管理子服务。

业务接口/功能示意图

消息存储

获取最近N条消息

获取指定时间段的消息

消息关键字搜索

服务实现流程

数据管理

MySQL(消息信息管理)

在消息存储子服务中,所有的消息的简息都要在MySQL中存储一份,进行消息的持久化。

以便于进行时间范围性查询和离线消息的实现。

消息类型有四种:文本、文件、语音、图片。

不可能把文件数据存到数据库中,因此数据库中只存储文本消息以及其它类型消息的元信息。

消息表结构:

消息ID 消息的唯一性标识。
消息产生时间 用于进行时间性查询。
消息发送者用户ID 明确消息发送者。
消息产生的聊天会话ID 明确消息属于哪个会话。
消息类型 明确消息的类型。
消息内容 只存储文本消息,文件类型消息存储在文件管理子服务。
文件ID 只有文件消息才有。
文件大小 只有文件消息才有。
文件名称 只有文件消息才有。

对应提供的操作:

1、新增消息。

2、根据聊天会话ID, 删除该会话中的所有消息信息。

3、通过会话ID、消息数量,获取最近N条消息(逆序 + limitN即可)。

4、通过会话ID、时间范围,获取指定时间段内的消息,并按时间进行排序。

ES(文本消息管理)

为了实现消息的关键字搜索功能,将消息内容存入ES中。

如果在数据库中进行关键字的模糊匹配,效率非常低下,因此选用ES。

但是在搜索的时候,需要进行聊天会话的过滤,因此这里也要考虑ES索引的构造。

消息文档索引(Index)
POST /message/_doc
{
  "settings": {
    "analysis": {
      "analyzer": {
        "ik": {
          "tokenizer": "ik_max_word"
        }
      }
    }
  },
  "mappings": {
    "dynamic": true,
    "properties": {
      "chat_session_id": {
        "type": "keyword",
        "analyzer": "standard"
      },
      "message_id": {
        "type": "keyword",
        "analyzer": "standard"
      },
      "content": {
        "type": "text",
        "analyzer": "ik_max_word"
      }
    }
  }
} 

其中关键类型就有:chat_session_id、type、message_id、content。

新增测试数据
POST /message/_doc/_bulk
{"index":{"_id":"1"}}
{"chat_session_id":"会话ID1","message_id":"消息ID1","content":"吃饭了么?"}
{"index":{"_id":"2"}}
{"chat_session_id":"会话ID1","message_id":"消息ID2","content":"吃的盖浇饭。"}
{"index":{"_id":"3"}}
{"chat_session_id":"会话ID2","message_id":"消息ID3","content":"昨天吃饭了么?"}
{"index":{"_id":"4"}}
{"chat_session_id":"会话ID2","message_id":"消息ID4","content":"昨天吃的盖浇饭。"}
查看数据
GET /message/_doc/_search?pretty 
{ 
    "query": { 
        "match_all": {} 
    } 
} 

查看结果:

成功。

删除该索引
DELETE /message

总体流程

1、编写服务所需的proto文件,利用protoc工具生成RPC服务器所需的.pb.h 和 .pb.cc 项目文件。
2、服务端 创建子类,继承于proto文件中RPC调用类,并进行功能性接口函数重写。
3、服务端 完成消息存储子服务类。
4、实例化 服务类对象,启动服务

服务代码实现

数据管理

MySQL(消息信息管理)

Message(ODB文件编写) 

想要实现MySQL对消息信息的管理,那么首先需要通过ODB编程,构造一个Message表。

message.hxx:

#pragma once
#include <iostream>
#include <odb/nullable.hxx>
#include <odb/core.hxx>
#include <boost/date_time/posix_time/posix_time.hpp>

// odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time message.hxx

namespace yangz
{
#pragma db object table("Message")
    class Message
    {
    public:
        Message() {}
        Message(const std::string &message_id,
                const boost::posix_time::ptime &create_time,
                const std::string &user_id,
                const std::string &chat_session_id,
                const unsigned char &message_type)
            : _message_id(message_id), _create_time(create_time), _user_id(user_id), _chat_session_id(chat_session_id), _message_type(message_type)
        {
        }
        ~Message() {}

    public:
        void set_message_id(const std::string &message_id) { _message_id = message_id; }
        std::string get_message_id() { return _message_id; }

        void set_create_time(const boost::posix_time::ptime &create_time) { _create_time = create_time; }
        boost::posix_time::ptime get_create_time() { return _create_time; }

        void set_user_id(const std::string &user_id) { _user_id = user_id; }
        std::string get_user_id() { return _user_id; }

        void set_chat_session_id(const std::string &chat_session_id) { _chat_session_id = chat_session_id; }
        std::string get_chat_session_id() { return _chat_session_id; }

        void set_message_type(unsigned char message_type) { _message_type = message_type; }
        unsigned char get_message_type() { return _message_type; }

        void set_message_content(const std::string &message_content) { _message_content = message_content; }
        std::string get_message_content()
        {
            if (_message_content)
                return *_message_content;
            return std::string();
        }

        void set_file_id(const std::string &file_id) { _file_id = file_id; }
        std::string get_file_id()
        {
            if (_file_id)
                return *_file_id;
            return std::string();
        }

        void set_file_size(unsigned int file_size) { _file_size = file_size; }
        unsigned int get_file_size()
        {
            if (_file_size)
                return *_file_size;
            return 0;
        }

        void set_file_name(const std::string &file_name) { _file_name = file_name; }
        std::string get_file_name()
        {
            if (_file_name)
                return *_file_name;
            return std::string();
        }

    private:
        friend class odb::access;
#pragma db id auto
        unsigned long _id; // 自增主键
#pragma db type("varchar(64)") index unique
        std::string _message_id; // 消息ID, varchar(64), 被索引, 唯一性约束
#pragma db type("TIMESTAMP")
        boost::posix_time::ptime _create_time; // 消息产生的时间, timestamp
#pragma db type("varchar(64)")
        std::string _user_id; // 消息发送者的用户ID, varchar(64)
#pragma db type("varchar(64)")
        std::string _chat_session_id; // 消息所属会话ID, varchar(64)

        unsigned char _message_type;                 // 消息类型: 0-文本 1-图片 2-文件 3-语音
        odb::nullable<std::string> _message_content; // 消息内容(文本消息)
#pragma db type("varchar(64)")
        odb::nullable<std::string> _file_id;    // 文件ID(文件消息)
        odb::nullable<unsigned int> _file_size; // 文件大小(文件消息)
#pragma db type("varchar(128)")
        odb::nullable<std::string> _file_name; // 文件名(文件消息)
    };
}

编译生成sql文件指令:

odb -d mysql --std c++11 --generate-query --generate-schema --profile boost/date-time message.hxx

此时在 .sql文件里新增:

然后将该.sql 文件导入数据库中:

mysql -uroot -p 'MicroChat' < message.sql 
Enter password:

现在在数据库中就有对应的表了: 

客户端操作编写(mysqlMessageTable.hpp)

对应提供的操作:

1、新增消息。

2、根据聊天会话ID, 删除该会话中的所有消息信息。

3、通过会话ID、消息数量,获取最近N条消息(逆序 + limitN即可),并按时间进行排序。

4、通过会话ID、时间范围,获取指定时间段内的消息,并按时间进行排序。

#pragma once
#include "odbMysqlHandleFactory.hpp"
#include "message.hxx"
#include "message-odb.hxx"
#include "logger.hpp"

namespace yangz
{
    class MessageTableClient
    {
    public:
        using ptr = std::shared_ptr<MessageTableClient>;
        MessageTableClient(const std::shared_ptr<odb::core::database> &mysql_client) : _mysql_client(mysql_client) {}

    public:
        // 新增消息信息
        bool insert(Message &message)
        {
            try
            {
                odb::transaction trans(_mysql_client->begin());
                _mysql_client->persist(message);
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("新增消息信息失败, 消息ID: {}, 失败原因: {}", message.get_message_id(), e.what());
                return false;
            }
            return true;
        }

        // 根据聊天会话ID,删除会话中的信息
        bool remove(const std::string &chat_session_id)
        {
            try
            {
                odb::transaction trans(_mysql_client->begin());
                typedef odb::query<Message> query;
                typedef odb::result<Message> result;
                _mysql_client->erase_query<Message>(query::chat_session_id == chat_session_id);
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("根据聊天会话ID删除聊天会话消息信息失败, chat_session_id: {}, 失败原因: {}", chat_session_id, e.what());
                return false;
            }
            return true;
        }

        // 通过会话ID、消息数量,获取最近N条消息(逆序 + limitN即可), 并按时间进行排序
        std::vector<Message> get_recent_N_message(const std::string &cssid, int N)
        {
            std::vector<Message> messages;
            try
            {
                odb::transaction trans(_mysql_client->begin());
                typedef odb::query<Message> query;
                typedef odb::result<Message> result;
                // select * from message where [chat_session_id = cssid order by create_time DESC limit N]
                std::stringstream condition;
                condition << "chat_session_id = '" << cssid << "' ";
                condition << "order by create_time DESC limit " << N;
                result r(_mysql_client->query<Message>(condition.str()));
                for (result::iterator i(r.begin()); i != r.end(); ++i)
                    messages.push_back(*i);

                std::reverse(messages.begin(), messages.end()); // 按照时间的先后顺序调整
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("获取最近N条消息失败, chat_session_id: {}, 失败原因: {}", cssid, e.what());
            }
            return messages;
        }

        // 通过会话ID、时间范围,获取指定时间段内的消息,并按时间进行排序
        std::vector<Message> get_time_range_message(const std::string &cssid,
                                                    boost::posix_time::ptime &start_time,
                                                    boost::posix_time::ptime &end_time)
        {
            std::vector<Message> messages;
            try
            {
                odb::transaction trans(_mysql_client->begin());
                typedef odb::query<Message> query;
                typedef odb::result<Message> result;
                result r(_mysql_client->query<Message>(query::chat_session_id == cssid &&
                                                       query::create_time >= start_time &&
                                                       query::create_time <= end_time));
                for (result::iterator i(r.begin()); i != r.end(); ++i)
                    messages.push_back(*i);

                std::reverse(messages.begin(), messages.end()); // 按照时间的先后顺序调整
                trans.commit();
            }
            catch (const std::exception &e)
            {
                LOG_ERROR("获取时间段内消息失败, chat_session_id: {}, start_time: {}, end_time: {} 失败原因: {}",
                          cssid, boost::posix_time::to_simple_string(start_time), boost::posix_time::to_simple_string(end_time), e.what());
            }
            return messages;
        }

    private:
        std::shared_ptr<odb::core::database> _mysql_client;
    };
}

ES(文本消息管理)

提供以下几个操作:

1、创建索引库。

2、添加文本新数据。

3、通过消息ID,移除消息信息。

4、通过关键字,查找/筛选数据。

elasticsearchDataManage.hpp:

    class ESMessageInfoManage
    {
    public:
        using ptr = std::shared_ptr<ESMessageInfoManage>;
        ESMessageInfoManage(const std::shared_ptr<elasticlient::Client> &es_client) : _es_client(es_client) {}
        ~ESMessageInfoManage() {}

    public:
        // 创建消息信息索引
        bool createIndex()
        {
            bool res = ESIndexCreate(_es_client, "message")
                           .append("user_id", "keyword", "standard", false)
                           .append("message_id", "keyword", "standard", false)
                           .append("create_time", "long", "standard", false)
                           .append("chat_session_id", "keyword", "standard", true)
                           .append("message_content")
                           .create();
            if (res == false)
            {
                LOG_INFO("消息信息索引创建失败");
                return false;
            }
            return true;
        }

        // 新增消息数据
        bool appendData(const std::string &user_id,
                        const std::string &message_id,
                        const long create_time,
                        const std::string &chat_session_id,
                        const std::string &message_content)
        {
            bool res = ESDataInsert(_es_client, "message")
                           .append("user_id", user_id)
                           .append("message_id", message_id)
                           .append("create_time", create_time)
                           .append("chat_session_id", chat_session_id)
                           .append("message_content", message_content)
                           .insert(message_id);
            if (res == false)
            {
                LOG_ERROR("消息数据插入失败");
                return false;
            }
            return true;
        }

        // 通过消息id 移除该消息
        bool remove(const std::string &message_ud)
        {
            bool ret = ESDataRemove(_es_client, "message").remove(message_ud);
            if (ret == false)
            {
                LOG_ERROR("消息数据删除失败!");
                return false;
            }
            LOG_INFO("消息数据删除成功!");
            return true;
        }

        // 关键字查询消息 - 关键字 + 会话ID
        std::vector<yangz::Message> search(const std::string &key, const std::string &cssid)
        {
            std::vector<yangz::Message> messages;
            Json::Value json_message = ESDataSearch(_es_client, "message")
                                           .append_must_term("chat_session_id.keyword", cssid)
                                           .append_must_match("message_content", key)
                                           .search();
            if (json_message.isArray() == false)
            {
                LOG_ERROR("消息搜索结果为空,或者结果不是数组类型");
                return messages;
            }
            int sz = json_message.size();
            for (int i = 0; i < sz; i++)
            {
                yangz::Message message;
                message.set_user_id(json_message[i]["_source"]["user_id"].asString());
                message.set_message_id(json_message[i]["_source"]["message_id"].asString());
                boost::posix_time::ptime ctime(boost::posix_time::from_time_t(
                    json_message[i]["_source"]["create_time"].asInt64()));
                message.set_create_time(ctime);
                message.set_chat_session_id(json_message[i]["_source"]["chat_session_id"].asString());
                message.set_message_content(json_message[i]["_source"]["message_content"].asString());
                messages.push_back(message);
            }
            return messages;
        }

    private:
        std::shared_ptr<elasticlient::Client> _es_client;
    };

编写proto文件

消息存储proto

既然消息模块有3个功能性接口,那么就有3个对应的请求与响应结构,以及最终的PRC调用(messageStore.proto)。

其中消息存储是内部接口,不做功能性接口显示。