C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(六)

发布于:2025-08-07 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

队列消费者/订阅者管理

代码实现

信道管理模块

代码实现

连接管理模块

代码实现


队列消费者/订阅者管理

        客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前用户想要订阅哪一个队列的消息。
        而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的意义了,因此也需要将相关的消费者信息给删除掉。

基于以上需求,因此需要对订阅者信息进行管理。

  • 定义消费者信息结构
    • 消费者标识。
    • 订阅的队列名称。
    • 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,如何消费?对于服务端来说就是调用这个个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)。
      • void(const std::string&, const BasicProperties&, const std::string&)
    • 是否自动应答标志。(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等待客户端确认)。
  • 消费者管理--以队列为单元进行管理-队列消费者管理结构
    • 操作
      • 新增消费者:信道提供的服务是订阅队列消息的时候创建。
      • 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除。
      • 获取消费者:从队列所有的消费者中按序取出一个消费者进行消息的推送。
      • 判断队列消费者是否为空。
      • 判断指定消费者是否存在。
      • 清理队列所有消费者。
    • 元素
      • 消费者管理结构:vector。
      • 轮转序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者消费即可,因此采用 RR 轮转。
      • 互斥锁:保证线程安全。
      • 队列名称。
  • 对消费者进行统一管理结构
    • 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)。
    • 向指定队列新增消费者(客户端订阅指定队列消息的时候):新增完成的时候返回消费者对象。
    • 从指定队列移除消费者(客户端取消订阅的时候)。
    • 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理单元对象。
    • 从指定队列获取一个消费者(轮询获取-消费者轮换消费起到负载均衡的作用)。
    • 判断队列中消费者是否为空。
    • 判断队列中指定消费者是否存在。
    • 清理所有消费者。

代码实现

#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>
#include <functional>

namespace jiuqi
{
    // 消费者表示, 属性, 消息
    using ConsumerCallback = std::function<void(const std::string&, const BasicProperties*, const std::string&)>;
    struct Consumer
    {
        using ptr = std::shared_ptr<Consumer>;
        std::string tag;           // 消费者标识
        std::string qname;         // 绑定的队列名称
        bool auto_ack;             // 是否自动应答
        ConsumerCallback callback; // 回调函数

        Consumer() { DEBUG("new Consumer %p", this); }
        Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
            : tag(ctag), qname(queue_name), auto_ack(ack), callback(cb) { DEBUG("new Consumer %p", this); }
        ~Consumer() { DEBUG("del Consumer %p", this); } 
    };

    // 以队列为单元的消费者管理结构
    class QueueConsumer
    {
    public:
        using ptr = std::shared_ptr<QueueConsumer>;
        QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0) {}

        // 新增消费者
        Consumer::ptr create(const std::string &ctag, bool ack, const ConsumerCallback &cb)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            // 判断是否重复
            for (auto &consumer : _consumers)
                if (consumer->tag == ctag)
                    return nullptr;
            // 构建消费者
            Consumer::ptr consumer = std::make_shared<Consumer>(ctag, _qname, ack, cb);
            _consumers.push_back(consumer);
            return consumer;
        }
        // 移除消费者
        void remove(const std::string &ctag)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            for (auto it = _consumers.begin(); it != _consumers.end(); it++)
            {
                if ((*it)->tag == ctag)
                {
                    _consumers.erase(it);
                    return;
                }
            }
        }
        // 获取消费者
        Consumer::ptr choose()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            if (_consumers.empty())
                return nullptr;

            int index = _rr_seq % _consumers.size();
            ++_rr_seq;
            return _consumers[index];
        }
        // 是否为空
        bool empty()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return _consumers.empty();
        }
        // 某个消费者是否存在
        bool exists(const std::string &ctag)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            for (auto it = _consumers.begin(); it != _consumers.end(); it++)
                if ((*it)->tag == ctag)
                    return true;
            return false;
        }
        // 清空
        void clear()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _consumers.clear();
            _rr_seq = 0;
        }

    private:
        std::string _qname;
        std::mutex _mutex;
        uint64_t _rr_seq; // 轮转序号
        std::vector<Consumer::ptr> _consumers;
    };

    class ConsumerManager
    {
    public:
        using ptr = std::shared_ptr<ConsumerManager>;
        ConsumerManager() {}

        void initQueueConsumer(const std::string &qname)
        {
            std::unique_lock<std::mutex> lock_mutex;

            auto it = _qconsumers.find(qname);
            if (it != _qconsumers.end())
                return;

            QueueConsumer::ptr qc = std::make_shared<QueueConsumer>(qname);
            _qconsumers.insert(std::make_pair(qname, qc));
        }

        void destoryQueueConsumer(const std::string &qname)
        {
            std::unique_lock<std::mutex> lock_mutex;
            for (auto it = _qconsumers.begin(); it != _qconsumers.end(); it++)
            {
                if (it->first == qname)
                {
                    _qconsumers.erase(it);
                    return;
                }
            }
        }

        Consumer::ptr create(const std::string &ctag, const std::string &qname, bool ack, const ConsumerCallback &cb)
        {
            QueueConsumer::ptr qcp;
            {
                std::unique_lock<std::mutex> lock_mutex;
                auto it = _qconsumers.find(qname);
                if (it == _qconsumers.end())
                {
                    DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
                    return nullptr;
                }   
                qcp = it->second; 
            }
            return qcp->create(ctag, ack, cb);
        }

        void remove(const std::string &ctag, const std::string &qname)
        {
            QueueConsumer::ptr qcp;
            {
                std::unique_lock<std::mutex> lock_mutex;
                auto it = _qconsumers.find(qname);
                if (it == _qconsumers.end())
                {
                    DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
                    return;
                }   
                qcp = it->second; 
            }
            return qcp->remove(ctag);
        }

        Consumer::ptr choose(const std::string &qname)
        {
            QueueConsumer::ptr qcp;
            {
                std::unique_lock<std::mutex> lock_mutex;
                auto it = _qconsumers.find(qname);
                if (it == _qconsumers.end())
                {
                    DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
                    return nullptr;
                }   
                qcp = it->second; 
            }
            return qcp->choose();
        }

        bool empty(const std::string &qname)
        {
            QueueConsumer::ptr qcp;
            {
                std::unique_lock<std::mutex> lock_mutex;
                auto it = _qconsumers.find(qname);
                if (it == _qconsumers.end())
                {
                    DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
                    return true;
                }   
                qcp = it->second; 
            }
            return qcp->empty();
        }

        bool exists(const std::string &ctag, const std::string &qname)
        {
            QueueConsumer::ptr qcp;
            {
                std::unique_lock<std::mutex> lock_mutex;
                auto it = _qconsumers.find(qname);
                if (it == _qconsumers.end())
                {
                    DEBUG("没有找到 %s 的消费者管理句柄", qname.c_str());
                    return false;
                }   
                qcp = it->second; 
            }
            return qcp->exists(ctag);
        }

        void clear()
        {
            std::unique_lock<std::mutex> lock_mutex;
            _qconsumers.clear();
        }

    private:
        std::mutex _mutex;
        std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;
    };
}

信道管理模块

        在 AMQP 模型中,除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。而信道模块就是再次将上述模块进行整合提供服务的模块。

  • 管理信息:
    • 信道 ID:信道的唯一标识。
    • 信道关联的消费者:用于消费者信道在关闭的时候取消订阅,删除订阅者信息。
    • 信道关联的连接:用于向客户端发送数据(响应,推送的消息)。
    • protobuf 协议处理句柄:网络通信前的协议处理。
    • 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息。
    • 虚拟机句柄:交换机/队列/绑定/消息数据管理。
    • 工作线程池句柄(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)。
  • 管理操作:
    • 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)。
    • 提供绑定&解绑队列操作。
    • 提供订阅&取消订阅队列消息操作。
    • 提供发布&确认消息操作。
  • 信道管理
    • 信道的增删查。

代码实现

#pragma once
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/threadpool.hpp"
#include "route.hpp"
#include "consumer.hpp"
#include "vhost.hpp"
#include <unordered_map>
#include <memory>

namespace jiuqi
{
    using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;

    using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
    using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
    using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
    using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
    using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
    using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
    using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;
    using queueUnbindRequestPtr = std::shared_ptr<queueUnbindRequest>;
    using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
    using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
    using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
    using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;

    class Channel
    {
    public:
        using ptr = std::shared_ptr<Channel>;
        Channel(const std::string &cid,
                const muduo::net::TcpConnectionPtr &conn,
                const ProtobufCodecPtr &codec,
                const ConsumerManager::ptr &cmp,
                const VirtualHost::ptr &vhost,
                const ThreadPool::ptr &pool)
            : _cid(cid),
              _conn(conn),
              _codec(codec),
              _cmp(cmp),
              _vhost(vhost),
              _pool(pool)
        {
            DEBUG("new Channel %p", this);
        }

        ~Channel()
        {
            if (_consumer != nullptr)
                _cmp->remove(_consumer->tag, _consumer->qname);
            DEBUG("del Channel %p", this);
        }

        // 交换机的声明与删除
        void declareExchange(const declareExchangeRequestPtr &req)
        {
            bool ret = _vhost->declareExchange(req->ename(), req->etype(), req->durable(), req->auto_delete(), req->args());
            basicResponse(ret, req->rid(), req->cid());
        }
        void deleteExchange(const deleteExchangeRequestPtr &req)
        {
            _vhost->deleteExchange(req->ename());
            basicResponse(true, req->rid(), req->cid());
        }

        // 队列的声明与删除
        void declareQueue(const declareQueueRequestPtr &req)
        {
            bool ret = _vhost->declareQueue(req->qname(), req->durable(), req->exclusive(), req->auto_delete(), req->args());
            if (ret)
                _cmp->initQueueConsumer(req->qname());
            basicResponse(ret, req->rid(), req->cid());
        }
        void deleteQueue(const deleteQueueRequestPtr &req)
        {
            _cmp->destoryQueueConsumer(req->qname());
            _vhost->deleteQueue(req->qname());
            basicResponse(true, req->rid(), req->cid());
        }

        // 队列的绑定与解绑
        void queueBind(const queueBindRequestPtr &req)
        {
            bool ret = _vhost->bind(req->ename(), req->qname(), req->bindingkey());
            basicResponse(ret, req->rid(), req->cid());
        }
        void queueUnbind(const queueUnbindRequestPtr &req)
        {
            _vhost->unBind(req->ename(), req->qname());
            basicResponse(true, req->rid(), req->cid());
        }

        // 消息的发布
        void basicPublish(const basicPublishRequestPtr &req)
        {
            // 判断交换机是否存在
            auto ep = _vhost->selectExchange(req->ename());
            if (ep == nullptr)
                basicResponse(false, req->rid(), req->cid());
            // 进行交换路由
            QueueBindingMap qbm = _vhost->exchangeBinding(req->ename());
            BasicProperties *bp = nullptr;
            std::string routekey;
            if (req->has_properties())
            {
                routekey = req->properties().routing_key();
                bp = req->mutable_properties();
            }
            for (auto &binding : qbm)
            {
                if (Router::route(ep->type, routekey, binding.second->bindingKey))
                {
                    // 将消息添加到队列中
                    _vhost->basicPublish(binding.first, bp, req->body());
                    // 向线程池中添加一个消息消费任务(向指定队列的订阅者推送消息)
                    auto task = std::bind(&Channel::consume, this, binding.first);
                    _pool->push(task);
                }
            }
            basicResponse(true, req->rid(), req->cid());
        }
        // 消息的确认
        void basicAck(const basicAckRequestPtr &req)
        {
            _vhost->basicAck(req->qname(), req->mid());
            basicResponse(true, req->rid(), req->cid());
        }
        // 订阅队列消息
        void basicConsumer(const basicConsumeRequestPtr &req)
        {
            // 判断队列是否存在
            bool ret = _vhost->existsQueue(req->qname());
            if (!ret)
                return basicResponse(false, req->rid(), req->cid());
            // 创建队列消费者
            auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
            // 创建了消费者之后, 当前的channel角色就是个消费者
            _consumer = _cmp->create(req->ctag(), req->qname(), req->auto_ack(), cb);
            basicResponse(true, req->rid(), req->cid());
        }
        // 取消订阅
        void basicCancel(const basicCancelRequestPtr &req)
        {
            _cmp->remove(req->ctag(), req->qname());
            basicResponse(true, req->rid(), req->cid());
        }

    private:
        void basicResponse(bool ok, const std::string &rid, const std::string &cid)
        {
            basicCommonResponse resp;
            resp.set_rid(rid);
            resp.set_cid(cid);
            resp.set_ok(ok);
            _codec->send(_conn, resp);
        }

        void consume(const std::string &qname)
        {
            MessagePtr mp = _vhost->basicConsume(qname);
            if (mp == nullptr)
            {
                DEBUG("%s 队列无消息", qname.c_str());
                return;
            }
            Consumer::ptr cp = _cmp->choose(qname);
            if (cp == nullptr)
            {
                DEBUG("%s 队列无消费者", qname.c_str())
                return;
            }
            cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());
            DEBUG("消费者: %s", cp->tag.c_str());
            if (cp->auto_ack)
                _vhost->basicAck(qname, mp->payload().properties().id());
        }

        void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
        {
            basicConsumeResponse resp;
            resp.set_cid(_cid);
            resp.set_ctag(tag);
            resp.set_body(body);
            if (bp)
            {
                resp.mutable_properties()->set_id(bp->id());
                resp.mutable_properties()->set_deliver_mode(bp->deliver_mode());
                resp.mutable_properties()->set_routing_key(bp->routing_key());
            }
            _codec->send(_conn, resp);
        }

    private:
        std::string _cid;
        Consumer::ptr _consumer;
        muduo::net::TcpConnectionPtr _conn;
        ProtobufCodecPtr _codec;
        ConsumerManager::ptr _cmp;
        VirtualHost::ptr _vhost;
        ThreadPool::ptr _pool;
    };

    class ChannelManager
    {
    public:
        using ptr = std::shared_ptr<ChannelManager>;
        ChannelManager() {}
        bool openChannl(const std::string &cid,
                        const muduo::net::TcpConnectionPtr &conn,
                        const ProtobufCodecPtr &codec,
                        const ConsumerManager::ptr &cmp,
                        const VirtualHost::ptr &vhost,
                        const ThreadPool::ptr &pool)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _channels.find(cid);
            if (it != _channels.end()) return false;
            auto channel = std::make_shared<Channel>(cid, conn, codec, cmp, vhost, pool);
            _channels.insert(std::make_pair(cid, channel));
            return true;
        }

        void closeChannel(const std::string &cid)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _channels.erase(cid);
        }

        Channel::ptr getChannel(const std::string &cid)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _channels.find(cid);
            if (it == _channels.end()) return nullptr;
            return it->second;            
        }
    private:
        std::mutex _mutex;
        std::unordered_map<std::string, Channel::ptr> _channels;
    };
}

连接管理模块

        向用户提供一个用于实现网络通信的 Connection 对象,从其内部可创建出粒度更轻的Channel 对象,用于与客户端进行网络通信。

  • 成员信息:
    • 连接关联的信道管理句柄(实现信道的增删查)。
    • 连接关联的实际用于通信的 muduo::net::Connection 连接。
    • protobuf 协议处理的句柄(ProtobufCodec 对象)。
    • 消费者管理句柄。
    • 虚拟机句柄。
    • 异步工作线程池句柄。
  • 连接操作:
    • 提供创建 Channel 信道的操作。
    • 提供删除 Channel 信道的操作。
  • 连接管理:
    • 连接的增删查。

代码实现

#pragma once
#include "channel.hpp"

namespace jiuqi
{
    class Connection
    {
    public:
        using ptr = std::shared_ptr<Connection>;
        Connection(const muduo::net::TcpConnectionPtr &conn,
                   const ProtobufCodecPtr &codec,
                   const ConsumerManager::ptr &cmp,
                   const VirtualHost::ptr &vhost,
                   const ThreadPool::ptr &pool)
            : _conn(conn),
              _codec(codec),
              _cmp(cmp),
              _vhost(vhost),
              _pool(pool),
              _channels(std::make_shared<ChannelManager>()) {}

        void openChannel(const openChannelRequestPtr &req)
        {
            bool ret = _channels->openChannl(req->cid(), _conn, _codec, _cmp, _vhost, _pool);
            basicResponse(ret, req->rid(), req->cid());
        }
        void closeChannel(const closeChannelRequestPtr &req) 
        {
            _channels->closeChannel(req->cid());
            basicResponse(true, req->rid(), req->cid());
        }
        Channel::ptr getChannel(const std::string &cid)
        {
            return _channels->getChannel(cid);
        }
        
    private:
        void basicResponse(bool ok, const std::string &rid, const std::string &cid)
        {
            basicCommonResponse resp;
            resp.set_rid(rid);
            resp.set_cid(cid);
            resp.set_ok(ok);
            _codec->send(_conn, resp);
        }

    private:
        muduo::net::TcpConnectionPtr _conn;
        ProtobufCodecPtr _codec;
        ConsumerManager::ptr _cmp;
        VirtualHost::ptr _vhost;
        ThreadPool::ptr _pool;
        ChannelManager::ptr _channels;
    };

    class ConnectionManager
    {
    public:
        using ptr = std::shared_ptr<ConnectionManager>;
        ConnectionManager() {}
        
        void newConnection(const muduo::net::TcpConnectionPtr &conn,
                   const ProtobufCodecPtr &codec,
                   const ConsumerManager::ptr &cmp,
                   const VirtualHost::ptr &vhost,
                   const ThreadPool::ptr &pool)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _conns.find(conn);
            if (it != _conns.end()) return;
            auto cnp = std::make_shared<Connection>(conn, codec, cmp, vhost, pool);
            _conns.insert(std::make_pair(conn, cnp));
        }

        void deleteConnection(const muduo::net::TcpConnectionPtr &conn)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _conns.erase(conn);
        }
        Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            auto it = _conns.find(conn);
            if (it == _conns.end()) return nullptr;
            return it->second;
        }
    private:
        std::mutex _mutex;
        std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns;
    };
}


网站公告

今日签到

点亮在社区的每一天
去签到