timer 模块
timer的定义,cyberrt中timer模块用于设置定时器任务,字面意思,设置设置定时周期及出发频次(周期 or oneshot),到达指定时间时间触发callback
time wheel
时钟节拍轮,常见的定时器设计,例如ucos中的定时器,Linux Crontab等,cyberrt也是采用了时钟轮
时间轮(TimingWheel)简单来说,是一个 存储定时任务的循环队列,队列中的每个元素都可以放置一个定时任务列表(TimeBucket) 。TimeBucket 是一个list,链表中的每一项表示的都是定时任务(TimerTask)。
时钟轮示意图
类图
@startuml
class Timer{
+explicit Timer(TimerOption opt)
+void Start()
+void Stop()
-TimerOption timer_opt_;
-TimingWheel* timing_wheel_ = nullptr;
-std::shared_ptr<TimerTask> task_;
}
class TimingWheel {
- TimerBucket work_wheel_[WORK_WHEEL_SIZE]
- TimerBucket assistant_wheel_[ASSISTANT_WHEEL_SIZE]
- std::thread tick_thread_
+ ~TimingWheel()
+ void Start()
+ void Shutdown()
+ void Tick()
+ void AddTask(const std::shared_ptr<TimerTask>& task)
+ void TickFunc()
}
class TimerBucket {
- std::mutex mutex_
- std::list<std::weak_ptr<TimerTask>> task_list_
+ void AddTask(const std::shared_ptr<TimerTask>& task)
+ std::mutex& mutex()
+ std::list<std::weak_ptr<TimerTask>>& task_list()
}
struct TimerTask {
+ TimerTask(uint64_t timer_id)
+ uint64_t timer_id_
+ std::function<void()> callback
+ uint64_t interval_ms
+ uint64_t remainder_interval_ms
+ uint64_t next_fire_duration_ms
+ int64_t accumulated_error_ns
+ uint64_t last_execute_time_ns
+ std::mutex mutex
}
class TimerOption {
+ uint32_t period
+ std::function<void()> callback
+ bool oneshot
+ TimerOption(uint32_t period, std::function<void()> callback, bool oneshot)
+ TimerOption()
}
note left of Timer
外部主要调用类,定时器对象实体
end note
note left of TimerOption
配置参数,主要作为入参构造timer,用于配置定时器
end note
note left of TimingWheel
环形队列,cyberrt内部实现为二级时钟节拍轮,单例
end note
note left of TimerBucket
环形队列中的元素,为链表,存储std::weak_ptr<TimerTask>,
等价线程安全的list<std::weak_ptr<TimerTask>>
end note
Timer --> TimerOption : 入参依赖
Timer --> TimingWheel : 成员依赖
TimingWheel --> TimerBucket : 成员依赖
TimerBucket --> TimerTask : 成员依赖
@enduml
实现原理
timingwheel 中的tick线程用于统计时间时间,并获取指向的时钟节拍论里面的元素进行任务触发,并将任务丢到cyberrt的携程池里运行,timewheel的实现基本都大同小异,这里不细说。
bug
bug主要是call back的生命周期管理问题,在代码中其实已经考虑了一部分(shared_ptr+weak_ptr)已经保证了一部分的悬空指针的问题,但是不完全。
bool Timer::InitTimerTask() {
task_.reset(new TimerTask(timer_id_));
task_->interval_ms = timer_opt_.period;
task_->next_fire_duration_ms = task_->interval_ms;
if (timer_opt_.oneshot) {
std::weak_ptr<TimerTask> task_weak_ptr = task_;
task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {
auto task = task_weak_ptr.lock();
if (task) {
std::lock_guard<std::mutex> lg(task->mutex);
callback();
}
};
} else {
std::weak_ptr<TimerTask> task_weak_ptr = task_;
task_->callback = [callback = this->timer_opt_.callback, task_weak_ptr]() {
auto task = task_weak_ptr.lock();
if (!task) {
return;
}
XXXX //省略
TimingWheel::Instance()->AddTask(task);
};
}
return true;
}
void TimingWheel::Tick() {
auto& bucket = work_wheel_[current_work_wheel_index_];
{
std::lock_guard<std::mutex> lock(bucket.mutex());
auto ite = bucket.task_list().begin();
while (ite != bucket.task_list().end()) {
auto task = ite->lock();
if (task) {
ADEBUG << "index: " << current_work_wheel_index_
<< " timer id: " << task->timer_id_;
auto* callback =
reinterpret_cast<std::function<void()>*>(&(task->callback));
cyber::Async([this, callback] {
if (this->running_) {
(*callback)();
}
});
}
ite = bucket.task_list().erase(ite);
}
}
}
以上代码,构建TimerTask添加到timing wheel中,task为share_ptr,所有权归属timer对象很合理,传递weak_ptr对象給timingWhee,timer对象释放后,节拍轮里面的weak_ptr不再有效进行不必要的触发,这也很正确,合理的考虑了timer对象持有的task和timingwheel中task的异步生命周期管理的问题。
但是在tick函数中,将触发的task放到异步协程池运行的时候则出现问题了,未考虑异步生命周期的问题。
现在假设我们有这样一个场景
创建了一个10ms的周期timer,经过第一个10ms时触发了定时器并将回掉丢到协程池中并推出tick,假设当时协程池比较繁忙,有恰巧我们迅速释放掉timer对象或者stop掉timer对象,you lose,程序boom了。
因为传递task这个share_ptr对象里的callback 成员函数进行了异步调用,tick函数内虽然正确获取了task,退出该函数,task引用计数-1,于此同时我们stop了timer 对象s或者 析构了对象,time持有的task,引用再次-1归0,tick函数内cyber::Async虽然正确获取了task的callback,但此时,已经是悬空指针了。
修改
知道了原因,改起来也很方便,无非就是异步调用指针的管理而已。
void TimingWheel::Tick() {
auto& bucket = work_wheel_[current_work_wheel_index_];
{
std::lock_guard<std::mutex> lock(bucket.mutex());
auto ite = bucket.task_list().begin();
while (ite != bucket.task_list().end()) {
auto task = ite->lock();
if (task) {
ADEBUG << "index: " << current_work_wheel_index_
<< " timer id: " << task->timer_id_;
// auto* callback =
// reinterpret_cast<std::function<void()>*>(&(task->callback));
cyber::Async([this, weakTask = std::weak_ptr<TimerTask>(task)] {
auto task = weakTask->lock();
if (this->running_ && task ) {
task->callback();
}
});
}
ite = bucket.task_list().erase(ite);
}
}
}