C++ - 仿 RabbitMQ 实现消息队列--服务器模块实现

发布于:2025-08-07 ⋅ 阅读:(13) ⋅ 点赞:(0)

 

目录

创建 MQBrokerServer 


       服务器模块我们借助 Muduo 网络库来实现。        

  • _server:Muduo 库提供的一个通用 TCP 服务器, 我们可以封装这个服务器进行TCP 通信。
  • _baseloop:主事件循环器, 用于响应 IO 事件和定时器事件,主 loop 主要是为了响应监听描述符的 IO 事件。
  • _codec: 一个 protobuf 编解码器, 我们在 TCP 服务器上设计了一层应用层协议,这个编解码器主要就是负责实现应用层协议的解析和封装。
  • _dispatcher:一个消息分发器, 当 Socket 接收到一个报文消息后, 我们需要按照消息的类型, 即上面提到的 typeName 进行消息分发, 会不不同类型的消息分发相对应的的处理函数中。
  • _consumer: 服务器中的消费者信息管理句柄。
  • _threadpool: 异步工作线程池,主要用于队列消息的推送工作。
  • _connections: 连接管理句柄,管理当前服务器上的所有已经建立的通信连接。
  • _virtual_host:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理。

创建 MQBrokerServer 

        BrokerServer 模块是对整体服务器所有模块的整合,接收客户端的请求,并提供服务。基于前边实现的简单的翻译服务器代码,进行改造,只需要实现服务器内部提供服务的各个业务接口即可。
        在各个业务处理函数中,也比较简单,创建信道后,每次请求过来后,找到请求对应的信道句柄,通过句柄调用前边封装好的处理接口进行请求处理,最终返回处理结果。

#pragma once
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"

#include "connection.hpp"
#include "consumer.hpp"
#include "vhost.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/threadpool.hpp"

#include <iostream>
#include <unistd.h>
#include <unordered_map>

namespace jiuqi
{
    #define DBFILE "/meta.db"
    #define VHOST "vhost"
    class BrokerServer
    {
    public:
        using MessagePtr = std::shared_ptr<google::protobuf::Message>;

        BrokerServer(int port, const std::string &basicDir)
            : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),
                      "server", muduo::net::TcpServer::kReusePort),
              _dispatcher(std::bind(&BrokerServer::onUnknownMessage, this,
                                    std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
              _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
                               std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
              _vhost(std::make_shared<VirtualHost>(VHOST, basicDir, basicDir + DBFILE)),
              _consumer_manager(std::make_shared<ConsumerManager>()),
              _connection_manager(std::make_shared<ConnectionManager>()),
              _threadpool(std::make_shared<ThreadPool>())
        {
            QueueMap qm = _vhost->allqueue();
            for (auto &queue : qm)
            {
                _consumer_manager->initQueueConsumer(queue.first);
            }

            // 注册请求处理函数
            _dispatcher.registerMessageCallback<jiuqi::openChannelRequest>(std::bind(&BrokerServer::onOpenChannel, this,
                                                                                     std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::closeChannelRequest>(std::bind(&BrokerServer::onCloseChannel, this,
                                                                                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::declareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange, this,
                                                                                         std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::deleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange, this,
                                                                                        std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::declareQueueRequest>(std::bind(&BrokerServer::onDeclareQueue, this,
                                                                                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::deleteQueueRequest>(std::bind(&BrokerServer::onDeleteQueue, this,
                                                                                     std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::queueBindRequest>(std::bind(&BrokerServer::onQueueBind, this,
                                                                                   std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::queueUnbindRequest>(std::bind(&BrokerServer::onQueueUnbind, this,
                                                                                     std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::basicPublishRequest>(std::bind(&BrokerServer::onBasicPublish, this,
                                                                                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::basicAckRequest>(std::bind(&BrokerServer::onBasicAck, this,
                                                                                  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::basicConsumeRequest>(std::bind(&BrokerServer::onBasicConsume, this,
                                                                                      std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _dispatcher.registerMessageCallback<jiuqi::basicCancelRequest>(std::bind(&BrokerServer::onBasicCancel, this,
                                                                                     std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
                                                 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _server.setConnectionCallback(std::bind(&BrokerServer::onConnection, this, std::placeholders::_1));
        }

        void start()
        {
            _server.start();
            _baseloop.loop();
        }

    private:
        // 打开信道
        void onOpenChannel(const muduo::net::TcpConnectionPtr &conn,
                           const openChannelRequestPtr &message,
                           muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("打开信道时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }

            mconn->openChannel(message);
        }

        // 关闭信道
        void onCloseChannel(const muduo::net::TcpConnectionPtr &conn,
                            const closeChannelRequestPtr &message,
                            muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("关闭信道时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            mconn->closeChannel(message);
        }

        // 声明交换机
        void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn,
                               const declareExchangeRequestPtr &message,
                               muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("声明交换机时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("声明交换机时,没有找到对应的信道");
                return;
            }
            channel->declareExchange(message);
        }

        // 删除交换机
        void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn,
                              const deleteExchangeRequestPtr &message,
                              muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("删除交换机时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("删除交换机时,没有找到对应的信道");
                return;
            }
            channel->deleteExchange(message);           
        }

        // 声明队列
        void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn,
                            const declareQueueRequestPtr &message,
                            muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("声明队列时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("声明队列时,没有找到对应的信道");
                return;
            }
            channel->declareQueue(message);            
        }

        // 删除队列
        void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn,
                           const deleteQueueRequestPtr &message,
                           muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("删除队列时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("删除队列时,没有找到对应的信道");
                return;
            }
            channel->deleteQueue(message);                  
        }

        // 队列绑定
        void onQueueBind(const muduo::net::TcpConnectionPtr &conn,
                         const queueBindRequestPtr &message,
                         muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("队列绑定时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("队列绑定时,没有找到对应的信道");
                return;
            }
            channel->queueBind(message);                 
        }

        // 队列解绑
        void onQueueUnbind(const muduo::net::TcpConnectionPtr &conn,
                           const queueUnbindRequestPtr &message,
                           muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("队列解绑时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("队列解绑时,没有找到对应的信道");
                return;
            }
            channel->queueUnbind(message);              
        }

        // 消息发布
        void onBasicPublish(const muduo::net::TcpConnectionPtr &conn,
                            const basicPublishRequestPtr &message,
                            muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("消息发布时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("消息发布时,没有找到对应的信道");
                return;
            }
            channel->basicPublish(message);               
        }

        // 消息确认
        void onBasicAck(const muduo::net::TcpConnectionPtr &conn,
                        const basicAckRequestPtr &message,
                        muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("消息确认时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("消息确认时,没有找到对应的信道");
                return;
            }
            channel->basicAck(message);                   
        }

        // 队列消息订阅
        void onBasicConsume(const muduo::net::TcpConnectionPtr &conn,
                            const basicConsumeRequestPtr &message,
                            muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("队列消息订阅时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("队列消息订阅时,没有找到对应的信道");
                return;
            }
            channel->basicConsumer(message);                  
        }

        // 队列消息取消订阅
        void onBasicCancel(const muduo::net::TcpConnectionPtr &conn,
                           const basicCancelRequestPtr &message,
                           muduo::Timestamp)
        {
            Connection::ptr mconn = _connection_manager->getConnection(conn);
            if (mconn == nullptr) 
            {
                DEBUG("队列消息取消订阅时,没有找到对应的连接对象");
                conn->shutdown();
                return;
            }
            Channel::ptr channel = mconn->getChannel(message->cid());
            if (channel == nullptr)
            {
                DEBUG("队列消息取消订阅时,没有找到对应的信道");
                return;
            }
            channel->basicCancel(message);                 
        }

        void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn,
                              const MessagePtr &message,
                              muduo::Timestamp)
        {
            LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
            conn->shutdown();
        }

        void onConnection(const muduo::net::TcpConnectionPtr &conn)
        {
            if (conn->connected())
            {
                _connection_manager->newConnection(conn, _codec, _consumer_manager, _vhost, _threadpool);
                LOG_INFO << "连接建立成功";
            }
            else
            {
                _connection_manager->deleteConnection(conn);
                LOG_INFO << "连接关闭";
            }
        }

    private:
        muduo::net::EventLoop _baseloop;
        muduo::net::TcpServer _server;  // 服务器对象
        ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数
        ProtobufCodecPtr _codec;        // protobuf协议处理器--针对收到的请求数据进行prototo协议处理
        VirtualHost::ptr _vhost;
        ConsumerManager::ptr _consumer_manager;
        ConnectionManager::ptr _connection_manager;
        ThreadPool::ptr _threadpool;
    };
}


网站公告

今日签到

点亮在社区的每一天
去签到