仿muduo库实现并发服务器
1.LoopThread模块
这个模块是为了将EventLoop与线程整合起来。一个EventLoop对应一个线程。
要知道EventLoop对象实例化时就会将它的thread_id绑定,然后后续的RuninLoop操作里面就会判断当前执行流的线程id与eventloop对应的线程id进行比较,如果相同就说明是同一个线程,如果不相同,就说明不是同一个线程执行。
这里要注意的是:
eventloop对象的实例化必须在线程创建之后,也就是在线程的入口函数中进行创建,然后eventloop对象一创建出来就绑定该线程。
就是构造一个新的模块,它在里边既有线程又有eventloop,而且它是先有线程然后在线程里边再去实例化eventlop,这样就能够保证eventloop在出现的第一时刻就跟我们的一个线程是绑定到一起的。
【获取eventloop对象】
这个模块,它将线程跟eventloop给整合到一起了,那么问题就来了,那当我们外界说一个新连接到来了,现在要给它分配一个eventloop对象,怎么给它分配呢?我总得能够获取到这个evetloop对象吧,所以在这个里边我们需要提供一个接口叫做GetLoop,获取到eventloop对象的指针。
那么当外界可以获取到这个eventloop对象指针之后,当新连接到来时,就可以将该线程的eventloop分配给该线程,该连接它就绑定在该eventloop对应的线程上工作。
比如在执行各种操作的时候,进行事件监控的时候,它就会绑定到我们eventloop对应的线程里边去,要进行某些操作的时候也是把任务压入到我们对eventloop,它的任务队列里边。
【获取evnetloop同步问题】
要避免当我们的线程刚创建起来之后,还没来得及实例化创建eventloop该对象的时候,这时候外界如果要获取该线程的eventloop,调用Getloop操作时,获取到的就为空。所以这里存在一个同步问题,也就是从属线程要先实例化eventloop对象,主线程才能够获取eventloop对象。
所以这里面需要两个东西:锁与条件变量。
这2个东西,它的一个功能就是用于那么实现我们的一个loop操作的一个同步关系。因为呢?我们要避免那么线程创建之后,但是我们的eventloop,还没有被实例化之前就去获取loop的操作,这样的获取是会出问题,就是一个空了对不对所以要给它同步管理起来,也就是说你获取loop的时候,必须是它loop已经那么实例化完了才可以之间的,你如果没有实例化完那就不让你获取。
也就是在获取eventloop对象指针时,设置一个条件:eventloop对象指针不为空。
如果这个时候不满足,则主线程就去条件变量下阻塞等待,如果满足条件就会被唤醒,去获取eventloop对象指针。
而在线程的入口函数里面,在刚创建实例化eventloop对象之后,就立即发送通知,唤醒主线程。
1.1成员变量
private:
std::thread _thread; // 用来创建线程,eventloop绑定的线程
EventLoop *_loop; // Eventloop的指针变量,这里不能实例化,必须在创建线程里面实例化eventloop
// 下面两个用来保证loop的同步性,必须先实例化完loop,才能获取loop
std::mutex _mutex; // 锁,用来保护loop资源
std::condition_variable _cond; // 条件变量
1.2构造函数
在构造中,就使用C++中的线程库,创建线程。
所以该模块对象一旦创建出来,就代表创建了一个线程以及对应的eventloop对象。
public:
LoopThread() : _loop(NULL), _thread(std::bind(&LoopThread::ThreadEntry, this))
{}
13线程入口函数
该线程一旦创建出来,就要执行的任务就是:
1.创建eventloop对象
2.唤醒主线程
3.启动eventloop
(eventloop一旦启动它就会循环监控在它上面的描述符的各种事件是否就绪,一旦就绪就会获取就绪的事件以及描述符,执行对应的回调函数,并且执行任务队列中的任务)
private:
// 线程的入口函数
void ThreadEntry()
{
// 首先创建对应的Eventloop对象
EventLoop loop;
_loop = &loop;
// 创建完就唤醒可能阻塞的主线程
_cond.notify_all();
// 该线程就开始启动工作:
_loop->Start();
// 1.进行事件监控,并获取就绪的事件和fd 2.进行就绪事件处理3.执行任务队列里的任务
}
1.4获取eventloop对象GetLoop()
// 获取该线程对应的loop是由主线执行的,所以存在线程安全问题,并且还要保证loop的同步问题
EventLoop *GetLoop()
{
EventLoop *loop = NULL; // 为了保证临界资源_loop的安全性,在访问时最后将临界资源拷贝过去,最后返回的时候就不涉及临界资源的访问
{
std::unique_lock<std::mutex> _lock(_mutex); // 加锁
// 确保获取_loop之前已经被实例化
_cond.wait(_lock, [&]()
{ return _loop != NULL; }); // 如果这时_loop为空,则主线程就一直阻塞在条件变量的队列中
loop = _loop;
}
return loop;
}
实现了我们的一个主从react模型,主线程只负责对监听套件字的处理,获取新连接
保证我们连接的一个获取效率不会因为我们的一个业务的处理而导致无法去获取连接
而我们的从属react呢?当它获取到新连接之后,将我们的新连接分配给我们的从属react线程中,让从属线程对它进行一个事件监控,进行事件监控然后事件的处理以及业务的处理。
2.LoopThreadPool模块
LoopThreadPool线程池是对所有的LoopThread线程进行管理及分配。
主要提供的功能是:
1.能够让用户设置创建几个线程。
不过要注意当用户设置的从线程数据为0时,这时候新连接的事件监控就必须要有主线程中的baseloop进行监控管理。所以该线程池中必须要传入主线程的baseloop对象,以防止从属线程的数量为0。
在主从Reactor模型中,主线程只负责获取新连接,而从属线程负责对连接进行监控和处理,当线程池中线程的个数为0时,就变成了单Reactor服务器,主线程既负责或者新连接,也负责新连接的监控与处理。
2.能够将线程池中对应线程的eventloop对象分配出去。
如果线程池中线程的个数为0 ,则将主线程的Eventloop分配给新连接,进行监控与处理。
如果线程池中线程的个数有多个,则采用RR轮转思想,进行线程的分配。(就是将对应线程的Eventloop对象获取到然后分配给Connection)
将所有线程的Eventloop对象都存储起来,然后依次分配出去即可。
2.1成员变量
private:
int _thread_count; // 要创建的线程的数量
int _loop_idx; // 要分配的loop下标
EventLoop *_baseloop; // 主线程的loop,外界传入进来,如果要创建的从线程数据为0,那么就分配出去的就是主线程的loop
std::vector<LoopThread *> _threads; // 保存所有的LoopThread线程对象
std::vector<EventLoop *> _loops; // 保存所有线程对应的eventloop,然后依次分配出去。
2.2构造函数
线程的数量在初始化时不设置,由用户调用接口设置。
外界需要将主线程的baseloop对象传入进来,以防止线程池中没有线程。
LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _loop_idx(0), _baseloop(baseloop) {}
2.3配置线程数量
void SetThreadCount(int count)
{
_thread_count = count;
}
2.4按照配置数量创建线程
创建线程时,将所有线程以及线程对应的eventloop对象都存储在数组中。
然后根据下标就可以依次分配线程对应的eventloop对象了。
void CreateLoopThread()
{
if (_thread_count > 0) // 如果大于0就将所有的线程都创建出来,如果小于0,那么就必须要用主线程的baseloop。
{
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++)
{
// 创建线程,并将所有的线程管理起来
_threads[i] = new LoopThread();
// 并保存管理所有线程对应的loop
_loops[i] = _threads[i]->GetLoop();
}
}
}
2.5依次分配Eventloop对象
如果线程池中线程的个数为0,这时候分配出去的eventloop对象就是主线程的baseloop对象。主线程eventloop既获取连接,也进行连接监控与处理。
否则就按照轮转形式依次分配出去。
EventLoop *AssignLoop()
{
// 如果从线程的数量为0,那么分配出去的loop就是主线程对应的loop
if (_thread_count == 0)
{
return _baseloop;
}
_loop_idx = (_loop_idx + 1) % _thread_count;
return _loops[_loop_idx];
}