目录
服务器的消费者管理模块在mqserver目录下实现。
第一节:代码实现
创建一个名为mq_consumer.hpp的文件,打开并做好前置工作:
#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
// 以消息队列为单元管理消费者
#include <iostream>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <functional>
namespace zd
{};
#endif
1-1.Consumer类
要管理消费者,首先要有消费者,定义class Consumer类,它的实现如下:
// 消费者的回调函数类型 using ConsumerCallback = std::function<void(const std::string,const BasicProperties*,const std::string)>; class Consumer { public: using ptr = std::shared_ptr<Consumer>; std::string tag; // 消费者唯一标识 std::string qname; // 消费者订阅的队列名称 bool auto_ack; // 自动确认标志 ConsumerCallback callback; // 订阅队列收到消息后调用,作用是推送消息给消费者 Consumer(){} Consumer(const std::string& ctag,const std::string& queue_name,bool ack_flag,const ConsumerCallback& cb): tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(cb) {} };
tag:消费者的唯一标识,由用户设置
qname:消费者订阅的队列
auto_ack:自动确认标志,如果设置为true,服务器再推送完消息后会直接删除消息,不等待消费者的确认请求。
callback:消费者对消息的处理函数,在服务端它的功能是固定的:将消息发送给对应的客户端消费者,因为服务器的消费者并不是真正的消费者,客户端消费者才是真正的消费者。而客户端消费者的消息处理函数才由用户自己定义。
1-2.QueueConsumer类
这个类用来管理一个队列的所有订阅者, 而且当一条消息到来时,不是所有队列的订阅者都能获得,只有队列当前轮询的一个消费者可以获得这条信息,这种叫做队列模型。
还有每个订阅者都能获得消息的订阅/发布模型,现在实现的是队列模型,项目基本完成后也会实现一下订阅/发布模型。
先实现一下构造函数和成员变量:
// 一个队列的消费者管理 class QueueConsumer { public: using ptr = std::shared_ptr<QueueConsumer>; QueueConsumer(const std::string& qname): _qname(qname), _rr_ser(0) {} private: std::mutex _mtx; std::string _qname; // 当前管理的队列名称 size_t _rr_ser; // 轮转序号:决定当前把消息推送给哪个消费者 std::vector<Consumer::ptr> _consumers; // 该队列的所有消费者 };
消费者管理接口:
// 新增消费者 Consumer::ptr create(const std::string& ctag,bool ack_flag,const ConsumerCallback& cb) { std::unique_lock<std::mutex> lock(_mtx); // 1.判断消费者重复添加 for(const auto& consumer:_consumers) { if(consumer->tag == ctag) return nullptr; } // 2.构造消费者对象并添加 Consumer::ptr consumer = std::make_shared<Consumer>(ctag,_qname,ack_flag,cb); _consumers.push_back(consumer); return consumer; } // 移除消费者 void remove(const std::string& ctag) { std::unique_lock<std::mutex> lock(_mtx); for(auto it = _consumers.begin();it != _consumers.end();it++) { if((*it)->tag == ctag) { _consumers.erase(it); return; } } }
获取当前轮询的消费者,决定消息的去向:
// 获取当前轮询的消费者 Consumer::ptr choose() { std::unique_lock<std::mutex> lock(_mtx); if(_consumers.size() == 0) return nullptr; return _consumers[_rr_ser++%_consumers.size()];; }
取模是为了不会越界访问。
其他的功能函数:
// 判空 bool empty() { std::unique_lock<std::mutex> lock(_mtx); return _consumers.empty(); } // 判断消费者是否存在 bool exists(const std::string& ctag) { std::unique_lock<std::mutex> lock(_mtx); for(const auto& consumer:_consumers) { if(consumer->tag == ctag) return true; } return false; } // 清理所有消费者 void clear() { std::unique_lock<std::mutex> lock(_mtx); _consumers.clear(); _rr_ser = 0; }
析构函数调用clear()接口,这样当队列被删除时,也会删除队列的QueueConsumer对象,析构函数就自动清理数据了:
~QueueConsumer()
{
clear();
}
1-3.QueueConsumerManger类
这个类用来管理服务器的所有队列的消费者。
基本上就是对class QueueConsumer的封装,但是在队列执行自己的函数的时候不要上锁,因为每个队列是独立的。
// 所有队列的消费者管理 class QueueConsumerManger { public: using ptr = std::shared_ptr<QueueConsumerManger>; QueueConsumerManger(){} // 插入一个消费者管理队列 void initQueueConsumer(const std::string& qname) { std::unique_lock<std::mutex> lock(_mtx); // 1.判断重复 auto it = _queue_consumers.find(qname); if(it != _queue_consumers.end()) { return; } // 2.构造并插入 QueueConsumer::ptr queueConsumer = std::make_shared<QueueConsumer>(qname); _queue_consumers.insert(std::make_pair(qname,queueConsumer)); } // 移除一个消费者管理队列 void destoryQueueConsumer(const std::string& qname) { std::unique_lock<std::mutex> lock(_mtx); // 1.判断存在 auto it = _queue_consumers.find(qname); if(it == _queue_consumers.end()) { return; } // 2.移除 _queue_consumers.erase(it); } // 向指定队列新增一个消费者 Consumer::ptr create(const std::string& qname,const std::string& ctag,bool ack_flag,const ConsumerCallback& cb) { QueueConsumer::ptr queue; { std::unique_lock<std::mutex> lock(_mtx); // 1.判断队列存在 auto qit = _queue_consumers.find(qname); if(qit == _queue_consumers.end()) { LOG("没有找到消费者管理队列 %s",qname.c_str()); return nullptr; } queue = qit->second; } // 2.调用队列管理的插入 return queue->create(ctag,ack_flag,cb); } // 从指定队列中移除一个消费者 void remove(const std::string& qname,const std::string& ctag) { QueueConsumer::ptr queue; { std::unique_lock<std::mutex> lock(_mtx); // 1.判断队列存在 auto qit = _queue_consumers.find(qname); if(qit == _queue_consumers.end()) { LOG("没有找到消费者管理队列 %s",qname.c_str()); return; } queue = qit->second; } // 调用队列管理的移除 queue->remove(ctag); } // 获取一个消费者管理队列的当前轮询消费者 Consumer::ptr choose(const std::string& qname) { QueueConsumer::ptr queue; { std::unique_lock<std::mutex> lock(_mtx); // 1.判断队列存在 auto qit = _queue_consumers.find(qname); if(qit == _queue_consumers.end()) { LOG("没有找到消费者管理队列 %s",qname.c_str()); return nullptr; } queue = qit->second; } return queue->choose(); } // 判空 bool empty(const std::string& qname) { QueueConsumer::ptr queue; { std::unique_lock<std::mutex> lock(_mtx); // 1.判断队列存在 auto qit = _queue_consumers.find(qname); if(qit == _queue_consumers.end()) { LOG("没有找到消费者管理队列 %s",qname.c_str()); return false; } queue = qit->second; } return queue->empty(); } // 判断某个队列的某个消费者是否存在 bool exists(const std::string& qname,const std::string& ctag) { QueueConsumer::ptr queue; { std::unique_lock<std::mutex> lock(_mtx); // 1.判断队列存在 auto qit = _queue_consumers.find(qname); if(qit == _queue_consumers.end()) { LOG("没有找到消费者管理队列 %s",qname.c_str()); return false; } queue = qit->second; } return queue->exists(ctag); } // 清理 void clear() { std::unique_lock<std::mutex> lock(_mtx); _queue_consumers.clear(); } size_t size() { return _queue_consumers.size(); } private: std::unordered_map<std::string,QueueConsumer::ptr> _queue_consumers; std::mutex _mtx; };
第二节:单元测试
打开mqtest目录,创建mq_consumer_test.cc,添加以下代码进行测试:
#include "../mqserver/mq_consumer.hpp" #include <gtest/gtest.h> #include <iostream> #include <unordered_map> zd::QueueConsumerManger::ptr qcmp; void cb(const std::string,const zd::BasicProperties*,const std::string){}; // 全局测试套件------------------------------------------------ // 自己初始化自己的环境,使不同单元测试之间解耦 class ConsumerTest :public testing::Environment { public: // 全部单元测试之前调用一次 virtual void SetUp() override { // std::cout << "单元测试执行前的环境初始化" << std::endl; qcmp = std::make_shared<zd::QueueConsumerManger>(); } // 全部单元测试之后调用一次 virtual void TearDown() override { // std::cout << "单元测试执行后的环境清理" << std::endl; // emp->clear(); } }; // 单元测试 // 测试名称与类名称相同,则会先调用SetUp // 测试队列的新增和移除 TEST(ConsumerTest,ConsumerTest_test1_Test) { std::cout << "单元测试-1" << std::endl; // 新增队列 qcmp->initQueueConsumer("q1"); ASSERT_EQ(qcmp->size(),1); qcmp->initQueueConsumer("q2"); ASSERT_EQ(qcmp->size(),2); qcmp->initQueueConsumer("q3"); ASSERT_EQ(qcmp->size(),3); qcmp->initQueueConsumer("q4"); ASSERT_EQ(qcmp->size(),4); qcmp->initQueueConsumer("q5"); ASSERT_EQ(qcmp->size(),5); qcmp->initQueueConsumer("q1"); ASSERT_EQ(qcmp->size(),5); // 移除队列 qcmp->destoryQueueConsumer("q2"); ASSERT_EQ(qcmp->size(),4); qcmp->destoryQueueConsumer("q6"); ASSERT_EQ(qcmp->size(),4); // q1 q3 q4 q5 } // 测试消费者的新增和移除 TEST(ConsumerTest,ConsumerTest_test2_Test) { std::cout << "单元测试-2" << std::endl; // 向队列新增消费者 qcmp->create("q1","consumer-1",false,cb); qcmp->create("q1","consumer-2",false,cb); qcmp->create("q1","consumer-3",false,cb); qcmp->create("q1","consumer-4",false,cb); qcmp->create("q1","consumer-5",false,cb); ASSERT_EQ(qcmp->exists("q1","consumer-1"),true); ASSERT_EQ(qcmp->exists("q1","consumer-6"),false); // 从队列移除消费者 qcmp->remove("q1","consumer-2"); ASSERT_EQ(qcmp->exists("q1","consumer-2"),false); ASSERT_EQ(qcmp->exists("q2","consumer-2"),false); // q2之前已经被移除了 // q1:c1 c3 c4 c5 } // 测试当前轮询接口 TEST(ConsumerTest,ConsumerTest_test3_Test) { std::cout << "单元测试-3" << std::endl; zd::Consumer::ptr cp1 = qcmp->choose("q1"); zd::Consumer::ptr cp2 = qcmp->choose("q1"); zd::Consumer::ptr cp3 = qcmp->choose("q1"); zd::Consumer::ptr cp4 = qcmp->choose("q1"); std::cout << std::endl; std::cout << cp1->tag << " " << cp2->tag << " " << cp3->tag << " " << cp4->tag << std::endl; std::cout << std::endl; zd::Consumer::ptr cp5 = qcmp->choose("q3"); ASSERT_EQ(cp5.get(),nullptr); } // 单元测试全部结束后调用TearDown // ---------------------------------------------------------- int main(int argc,char** argv) { testing::InitGoogleTest(&argc,argv); testing::AddGlobalTestEnvironment(new ConsumerTest); // 注册Test的所有单元测试 if(RUN_ALL_TESTS() != 0) // 运行所有单元测试 { printf("单元测试失败!\n"); } return 0; }
编译:
mq_consumer_test:mq_consumer_test.cc g++ -std=c++14 $^ -o $@ -lgtest -lprotobuf
执行结果:
没有错误,打印也符合预期。
服务器消费者管理模块就完成了。
下期预告:
完成消费者管理模块之后,下一个要完成的是信道管理模块。
对于服务器来说,一个信道就可以是一个消费者。如果客户端创建了一个信道(c-1),客户端就会发送创建信道的请求给服务器,服务器收到请求后也会创建一个对应的信道(c-1')。当客户端使用信道进行订阅的时候,服务器的对应信道就会承担消费者的角色,服务器会把消息推送给这个信道,这个信道再把消息推送给客户端的信道。
以上过程中,消息都被推送给了服务器的c-1',所以对于服务器来说,c-1'才是消费者,而不是真正使用消息的客户端信道c-1。