Kafka存在大量的延迟操作,比如延迟删除、延迟拉取等。Kafka基于时间轮概念自定义了一个用于延迟操作的定时器。
JDK自带的Timer和DelayQueue缺陷
Timer和DelayQueue都可以插入多个定时任务,它们都使用一个优先级队列来管理任务,复杂度为O(logn)。
Timer |
单线程,前置任务会阻塞后置任务,如果任务抛出异常,Timer会中断停止。 |
DelayQueue |
线程安全,可用于多线程,是一个无界阻塞队列。 |
表 Timer和DelayQueue的对比
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();
delayQueue.offer(new DelayQueueTask("task1",5000));
delayQueue.offer(new DelayQueueTask("task2",2000));
delayQueue.offer(new DelayQueueTask("task3",4000));
System.out.println("开始执行delayQueue任务");
while (!delayQueue.isEmpty()) {
DelayQueueTask task = delayQueue.take();
System.out.println("任务:" + task);
}
System.out.println("delayQueue任务 任务执行完毕");
}
时间轮结构
任务插入及删除O(logn)的复杂度不能满足Kafka高性能要求。时间轮时间复杂度为O(1)。
图 时间轮(TimingWheel)结构
时间轮类似于机械手表,秒针1s前进一次,分针在秒针前进一圈后前进1格。。
tick |
时间跨度。上图第一层tick=1s,第二次tick=10s。 |
wheelSize |
时间格数。每一层格数一样。每层时间周期interval=tick*wheelSize |
currentTick |
每层当前指向的时间格。 |
bucket |
桶,每个时间格中用于保存待执行任务的列表(TimerTaskList)。 |
表 时间轮中的基本概念
// 时间轮添加任务的伪代码:HashedWheelTimer 的 addTask 方法
public void addTask(TimerTask task, long delayMs) {
if (delayMs < current.interval) {
// 插入当前层
int bucketIndex = (currentTick + delayMs / current.tickMs) % current.wheelSize;
current.buckets[bucketIndex].addTask(task);
} else {
// 检查是否存在上层时间轮
if (overflowTimer == null) {
// 动态创建上层时间轮
long nextTickMs = current.tickMs * current.wheelSize; // 上层 tickMs = 当前层总跨度
overflowTimer = new HashedWheelTimer(nextTickMs, current.wheelSize);
}
// 递归调用:将剩余延迟传递给上层
overflowTimer.addTask(task, delayMs - current.interval);
}
}
每次创建上层时间轮时,该层的currentTick初始为0。
时间的推进
时间轮如果像机械手表那样,按照最底层时间跨度一格一格推进,那么将非常耗性能。Kafka使用DelayQueue来推进时间。桶中任务链表按照待执行时间进行排序,其中最快执行的任务放在头部。桶的TimerTaskList将作为DelayQueue一个元素插入,该元素的待执行时间为TimerTaskList的头部元素的时间。
当TimerTaskList被取出执行时,此时会维护各层的currentTick。同时会对列表中还有剩余时间的任务进行“时间轮降级”,将它们插入到对于的桶中。