TimerTaskEntry
TimerTaskEntry
是 Kafka 定时器实现中的一个核心内部类。从它的设计和功能来看,它扮演着一个“任务包装器”和“链表节点”的双重角色,是连接 TimerTask
(具体任务) 和 TimerTaskList
(任务列表) 的桥梁。
public final class TimerTaskEntry {
// ...
}
public final class
:TimerTaskEntry
被声明为final
,意味着它不能被任何其他类继承。这表明它是一个功能完整、不希望被修改或扩展的工具类。在 Kafka 的定时器体系中,它是一个具体的数据结构。- 核心作用: 它的主要作用是包装一个待执行的
TimerTask
,并为其附加元数据(如过期时间expirationMs
),同时通过next
和prev
指针,使自己能作为一个节点存在于一个双向链表中。这个双向链表就是TimerTaskList
。
核心成员变量
// ... existing code ...
public final class TimerTaskEntry {
public final TimerTask timerTask;
public final long expirationMs;
volatile TimerTaskList list;
TimerTaskEntry next;
TimerTaskEntry prev;
// ... existing code ...
public final TimerTask timerTask
: 这是TimerTaskEntry
所包装的真正需要执行的任务。TimerTask
是一个抽象类,定义了任务的具体逻辑(在run()
方法中)。final
关键字确保一个TimerTaskEntry
对象一旦创建,其内部的任务就不会被改变。public final long expirationMs
: 这是一个绝对的过期时间戳(毫秒),表示这个任务应该在这个时间点被执行。final
确保了任务的过期时间在创建后也是不可变的。定时器(TimingWheel
)会根据这个时间来决定将任务项放入哪个“时间格”(TimerTaskList
)。volatile TimerTaskList list
: 这个变量非常关键。它指向当前TimerTaskEntry
所在的TimerTaskList
。volatile
: 这个关键字是实现线程安全的核心。它保证了当一个线程修改了list
变量的值后,其他线程能够立即看到这个修改。这在多线程环境下至关重要,因为一个任务项可能会被一个线程从一个列表移动到另一个列表(例如,当时间轮滚动时),而另一个线程可能同时尝试取消或移除这个任务。
TimerTaskEntry next
和TimerTaskEntry prev
: 这两个字段是标准的双向链表指针,分别指向链表中的下一个和上一个TimerTaskEntry
。这使得在TimerTaskList
中添加和删除任务项的操作可以达到 O(1) 的时间复杂度。
构造函数
// ... existing code ...
public TimerTaskEntry(
TimerTask timerTask,
long expirationMs
) {
this.timerTask = timerTask;
this.expirationMs = expirationMs;
// if this timerTask is already held by an existing timer task entry,
// setTimerTaskEntry will remove it.
if (timerTask != null) {
timerTask.setTimerTaskEntry(this);
}
}
// ... existing code ...
构造函数除了初始化 timerTask
和 expirationMs
外,最重要的一步是调用 timerTask.setTimerTaskEntry(this)
。这建立了一个双向引用:TimerTaskEntry
持有 TimerTask
,同时 TimerTask
也持有其对应的 TimerTaskEntry
。
这个设计有一个非常巧妙的作用:自动处理任务的重调度。让我们看一下 TimerTask
中的 setTimerTaskEntry
方法:
// ... existing code ...
final void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
// if this timerTask is already held by an existing timer task entry,
// we will remove such an entry first.
if (timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
}
// ... existing code ...
当为一个已经存在于某个定时列表中的 TimerTask
创建一个新的 TimerTaskEntry
时(相当于重新调度),setTimerTaskEntry
方法会先将旧的 TimerTaskEntry
从它所在的列表中移除(timerTaskEntry.remove()
),然后再将引用指向新的 entry
。这样就保证了一个 TimerTask
在任何时候只与一个有效的 TimerTaskEntry
相关联,避免了重复执行。
cancelled()
// ... existing code ...
public boolean cancelled() {
return timerTask.getTimerTaskEntry() != this;
}
// ... existing code ...
这个方法用于判断任务是否已被取消。它的逻辑是检查其内部的 timerTask
是否还指向当前的 TimerTaskEntry
实例。如果不是,就意味着该任务项失效(被取消)了。这可能由以下两种情况导致:
- 显式取消: 代码直接调用了
timerTask.cancel()
,该方法会将其内部的timerTaskEntry
引用设为null
。 - 隐式取消 (重调度): 如上文所述,当为同一个
timerTask
创建了一个新的TimerTaskEntry
时,旧的TimerTaskEntry
就被隐式地取消了。
remove()
// ... existing code ...
public void remove() {
TimerTaskList currentList = list;
// If remove is called when another thread is moving the entry from a task entry list to another,
// this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
// In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
while (currentList != null) {
currentList.remove(this);
currentList = list;
}
}
// ... existing code ...
这是 TimerTaskEntry
中最能体现其并发设计精髓的方法。它的目标是将当前任务项从它所在的 TimerTaskList
中安全地移除。
while
循环: 这个循环是为了处理一个棘手的并发场景。- 场景: 线程 A 想要移除这个
entry
,同时线程 B(通常是时间轮的推进线程)正在将这个entry
从一个TimerTaskList
移动到另一个。 - 过程:
- 线程 A 执行
currentList = list
,获取了entry
当前所在的列表(比如listA
)。 - 此时,线程 B 开始执行,它将
entry
从listA
中移除,并将其加入listB
,然后更新了entry.list
的值为listB
。 - 线程 A 继续执行
currentList.remove(this)
,它尝试从listA
中移除entry
。但此时entry
已经不在listA
中了,所以这个操作失败了。 - 如果没有
while
循环,remove()
方法就会在没有成功移除entry
的情况下返回。 - 有了
while
循环,线程 A 会执行currentList = list
重新读取entry.list
的值(由于list
是volatile
的,它能看到线程 B 的修改,现在currentList
变成了listB
),然后再次尝试remove
,直到成功将entry
从它当前所在的列表中移除。当TimerTaskList.remove()
成功后,它会将entry.list
设为null
,此时循环才会终止。
- 线程 A 执行
- 场景: 线程 A 想要移除这个
总结
TimerTaskEntry
是 Kafka 高性能定时器(时间轮)算法的基石。它不仅仅是一个数据载体,更通过精巧的并发设计,解决了任务调度、取消和重调度过程中的线程安全问题。
- 它通过双向链表结构,实现了在
TimerTaskList
中任务的快速增删。 - 它通过
timerTask
和TimerTaskEntry
的双向引用,巧妙地实现了任务的重调度逻辑。 - 它通过
volatile
关键字和remove()
方法中的重试循环,优雅地处理了多线程下的竞态条件,保证了数据的一致性和操作的原子性。
TimerTask
TimerTask
是 Kafka 定时器功能中代表“待执行任务”的基石。它是一个抽象类,用户需要继承它并实现具体的任务逻辑。它与我们之前分析的 TimerTaskEntry
紧密协作,共同构成了 Kafka 高效时间轮定时器的核心部分。
public abstract class TimerTask implements Runnable {
// ...
}
public abstract class TimerTask
: 这定义了一个抽象类,意味着你不能直接创建TimerTask
的实例 (new TimerTask(...)
)。你必须创建一个继承自它的子类,并实现其抽象方法。这是一种典型的模板方法设计模式,它定义了任务的基本框架和生命周期管理(如取消、重调度),同时将具体的执行逻辑交由子类实现。implements Runnable
: 这是该类一个非常关键的设计决策。通过实现Runnable
接口,任何TimerTask
的子类实例都可以被标准的 JavaExecutorService
(线程池) 直接执行。这使得 Kafka 的定时器系统可以将 计时(何时执行)和 执行(如何执行)这两个关注点完全分离。时间轮(TimingWheel
)负责在正确的时间点找出到期的任务,然后将这个TimerTask
对象抛给一个线程池去执行,而不需要关心任务内部的具体逻辑。
核心成员变量
// ... existing code ...
public abstract class TimerTask implements Runnable {
private volatile TimerTaskEntry timerTaskEntry;
// timestamp in millisecond
public final long delayMs;
// ... existing code ...
private volatile TimerTaskEntry timerTaskEntry
: 这是TimerTask
与其在定时器数据结构中的“代理”——TimerTaskEntry
——之间的连接纽G带。volatile
: 这个关键字至关重要,它确保了timerTaskEntry
字段的修改在多线程之间立即可见。考虑以下场景:一个线程调用cancel()
方法将timerTaskEntry
设置为null
,而另一个定时器线程正在检查这个任务是否被取消。volatile
保证了定时器线程能立即看到这个变化。- 状态指示器: 这个字段也隐式地代表了任务的状态。如果它为
null
,意味着任务要么从未被调度,要么已经被取消。如果它不为null
,则表示任务正处于调度队列中。
public final long delayMs
: 这个字段表示任务的延迟执行时间(以毫秒为单位)。这是一个相对时间。当用户将此任务添加到定时器时,定时器会用当前时间加上这个delayMs
来计算出任务的绝对过期时间expirationMs
,并用该绝对时间来创建TimerTaskEntry
。final
关键字确保了任务的延迟一旦设定就不能更改。
构造函数
// ... existing code ...
public TimerTask(long delayMs) {
this.delayMs = delayMs;
}
// ... existing code ...
构造函数很简单,就是用来设置任务的延迟时间 delayMs
。
cancel()
和 isCancelled()
// ... existing code ...
public void cancel() {
synchronized (this) {
if (timerTaskEntry != null) timerTaskEntry.remove();
timerTaskEntry = null;
}
}
public boolean isCancelled() {
return timerTaskEntry == null;
}
// ... existing code ...
cancel()
: 这个方法用于取消一个已经调度的任务。synchronized (this)
: 对this
对象加锁,以确保在多线程环境下取消操作的原子性,防止与setTimerTaskEntry
等方法发生冲突。if (timerTaskEntry != null) timerTaskEntry.remove();
: 如果任务当前正处于调度中(timerTaskEntry
不为null
),它会调用其关联的TimerTaskEntry
的remove()
方法。这会将任务项从它所在的TimerTaskList
(双向链表)中移除。这里体现了TimerTask
和TimerTaskEntry
之间的委托关系。timerTaskEntry = null;
: 将引用设为null
,彻底切断与TimerTaskEntry
的关联,并将任务标记为“已取消”。
isCancelled()
: 提供了一个简单、快速的方式来检查任务是否已被取消。
setTimerTaskEntry(TimerTaskEntry entry)
// ... existing code ...
final void setTimerTaskEntry(TimerTaskEntry entry) {
synchronized (this) {
// if this timerTask is already held by an existing timer task entry,
// we will remove such an entry first.
if (timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
}
// ... existing code ...
这个方法是包级私有的(final
阻止子类重写),由 TimerTaskEntry
的构造函数调用,用于建立 TimerTask
和 TimerTaskEntry
之间的双向链接。
- 处理重调度: 这个方法最精妙之处在于它对任务重调度的处理。
if (timerTaskEntry != null && timerTaskEntry != entry)
这个判断检查了当前TimerTask
是否已经关联了一个旧的TimerTaskEntry
。如果是(意味着这个任务之前被调度过,现在要重新调度),它会先调用旧entry
的remove()
方法将其从定时器队列中移除,然后再关联到新的entry
。这优雅地保证了一个TimerTask
实例在任何时刻最多只在定时器中存在一份,避免了混乱和重复执行。
抽象方法 run()
TimerTask
实现了 Runnable
接口,但它本身没有提供 run()
方法的实现(Runnable
接口中的 run
方法不是 abstract
的,但 TimerTask
类是 abstract
的,并且没有实现 run
,所以继承者必须实现)。任何想要被执行的具体任务都必须继承 TimerTask
并提供 run()
方法的具体实现。
例如,在测试代码中我们能看到很多这样的例子:
// ... existing code ...
private static class TestTask extends TimerTask {
// ... existing code ...
TestTask(
long delayMs,
// ... existing code ...
) {
super(delayMs);
// ... existing code ...
}
@Override
public void run() {
if (completed.compareAndSet(false, true)) {
synchronized (output) {
output.add(id);
}
latch.countDown();
}
}
}
// ... existing code ...
这个 TestTask
继承了 TimerTask
,并在 run()
方法中定义了具体的执行逻辑:记录ID并对 latch
进行倒数。
总结
TimerTask
是 Kafka 定时器系统中用户与之交互的直接接口。它通过抽象类的设计,强制用户关注于任务的业务逻辑(实现 run
方法),而将复杂的调度、取消、重调度和线程安全等问题封装在基类和 TimerTaskEntry
的交互之中。它与 TimerTaskEntry
之间的双向引用和委托机制是整个设计中最为精妙的部分,实现了高效且线程安全的任务生命周期管理。
TimerTaskList
TimerTaskList
是 Kafka 高效时间轮 (TimingWheel
) 算法的核心数据结构之一。它代表了时间轮上的一个“桶”(bucket),这个桶里存放着所有过期时间落在同一个时间区间的定时任务。
可以把它想象成时钟表盘上的一个刻度(比如“3点钟”位置),所有应该在“3点钟”这个时间片内到期的任务,都会被放进这个刻度对应的 TimerTaskList
中。
class TimerTaskList implements Delayed {
// ...
}
class TimerTaskList
: 这是一个包级私有的类,意味着它只为timer
包内的其他类(主要是TimingWheel
)服务,是内部实现细节,不对外暴露。implements Delayed
: 这是该类最关键的特性之一。Delayed
是java.util.concurrent
包下的一个接口,实现了它的对象可以被放入DelayQueue
中。DelayQueue
是一个无界的阻塞队列,它只会在元素的延迟时间到期后才允许被取出。这正是TimingWheel
实现其功能的基石:TimingWheel
将代表不同时间刻度(桶)的TimerTaskList
放入一个DelayQueue
中。DelayQueue
会自动根据每个TimerTaskList
的过期时间进行排序。TimingWheel
的工作线程只需要从DelayQueue
中take()
元素。只有当一个TimerTaskList
(桶)整体过期时,take()
操作才会返回,从而让工作线程开始处理这个桶里的所有任务。
带哨兵节点的双向循环链表
// ... existing code ...
// TimerTaskList forms a doubly linked cyclic list using a dummy root entry
// root.next points to the head
// root.prev points to the tail
private final TimerTaskEntry root;
// ... existing code ...
TimerTaskList(
AtomicInteger taskCounter,
Time time
) {
// ... existing code ...
this.root = new TimerTaskEntry(null, -1L);
this.root.next = root;
this.root.prev = root;
}
// ... existing code ...
TimerTaskList
内部使用了一个带哨兵节点(dummy root entry)的双向循环链表来存储 TimerTaskEntry
(任务条目)。
private final TimerTaskEntry root
: 这个root
就是哨兵节点。它本身不存储任何有效的任务数据。root.next
: 指向链表的第一个实际任务节点(head)。root.prev
: 指向链表的最后一个实际任务节点(tail)。- 循环链表: 链表的最后一个节点的
next
指向root
,第一个节点的prev
也指向root
。 - 空链表: 当链表为空时,
root.next
和root.prev
都指向root
自身。
使用这种结构的好处是极大地简化了链表的插入和删除操作。无论是插入到头部、尾部,还是删除任意一个节点,算法都是统一的,无需对头、尾节点或空链表做特殊的边界条件判断,代码更简洁、高效。
核心成员变量
// ... existing code ...
class TimerTaskList implements Delayed {
private final Time time;
private final AtomicInteger taskCounter;
private final AtomicLong expiration;
// ... existing code ...
private final Time time
: 持有一个Time
实例,用于获取当前时间,主要在getDelay()
方法中使用。private final AtomicInteger taskCounter
: 这是一个共享的原子计数器。同一个TimingWheel
的所有TimerTaskList
共享同一个taskCounter
。它用于跟踪整个TimingWheel
中的任务总数。private final AtomicLong expiration
: 代表这个桶(TimerTaskList
)的过期时间戳。DelayQueue
就是根据这个值来决定何时可以将此TimerTaskList
从队列中取出。-1L
是一个特殊值,表示该桶还未设置过期时间或已被清空。
add(TimerTaskEntry timerTaskEntry)
这是将一个任务条目添加进本链表(桶)的方法。
// ... existing code ...
public void add(TimerTaskEntry timerTaskEntry) {
boolean done = false;
while (!done) {
// ...
timerTaskEntry.remove();
synchronized (this) {
synchronized (timerTaskEntry) {
if (timerTaskEntry.list == null) {
// put the timer task entry to the end of the list.
TimerTaskEntry tail = root.prev;
timerTaskEntry.next = root;
timerTaskEntry.prev = tail;
timerTaskEntry.list = this;
tail.next = timerTaskEntry;
root.prev = timerTaskEntry;
taskCounter.incrementAndGet();
done = true;
}
}
}
}
}
// ... existing code ...
while (!done)
和timerTaskEntry.remove()
: 这个设计非常精妙,用于处理任务的重调度。一个TimerTask
可能先被加到一个桶里,之后又被加到另一个桶里。timerTaskEntry.remove()
会尝试将这个entry
从它之前所在的链表中移除。这个操作放在synchronized
块之外是为了避免潜在的死锁。- 双重
synchronized
:synchronized (this)
锁住当前链表,synchronized (timerTaskEntry)
锁住要添加的条目。这确保了在检查和修改timerTaskEntry.list
时的原子性,防止并发冲突。 - 链表操作:
if (timerTaskEntry.list == null)
确认该任务条目当前不属于任何链表后,执行标准的双向链表尾部插入操作,并更新taskCounter
。
死锁说明
将 timerTaskEntry.remove()
放在 synchronized (this)
块之外是为了防止死锁。我们来分析一下死锁是如何发生的。
假设我们有两个 TimerTaskList
对象,我们称之为 listA
和 listB
。同时,有两个线程,Thread1
和 Thread2
。
现在,考虑一种场景:Thread1
想要把一个任务 taskEntry
从 listA
移动到 listB
,而几乎在同一时间,Thread2
想要把同一个 taskEntry
从 listB
移动到 listA
。(虽然这个场景在 TimingWheel
的正常逻辑中不常见,但在并发程序设计中,必须考虑这种可能性,尤其是当任务可以被取消和重新调度时)。
让我们看看如果 remove()
操作在 synchronized
块内部,会发生什么:
死锁场景模拟 (错误的设计):
// 假设这是 add 方法的错误实现
public void add(TimerTaskEntry timerTaskEntry) {
boolean done = false;
while (!done) {
synchronized (this) { // 1. 线程先锁住目标 list
timerTaskEntry.remove(); // remove 也在锁内部
// ... 插入逻辑 ...
}
}
}
// 假设这是 remove 方法
public synchronized void remove(TimerTaskEntry timerTaskEntry) { // 2. remove 会锁住它所在的 list
synchronized (timerTaskEntry) {
// ...
}
}
现在,我们来看死锁的发生步骤:
Thread1
调用listB.add(taskEntry)
:Thread1
进入listB.add
方法,并成功获取了listB
的锁 (synchronized (this)
,这里的this
是listB
)。Thread1
准备执行taskEntry.remove()
。假设taskEntry
当前在listA
中。
Thread2
调用listA.add(taskEntry)
:- 几乎同时,
Thread2
进入listA.add
方法,并成功获取了listA
的锁 (synchronized (this)
,这里的this
是listA
)。 Thread2
准备执行taskEntry.remove()
。假设taskEntry
当前在listA
中(或者在Thread1
操作前在listB
中,效果类似)。
- 几乎同时,
死锁形成:
Thread1
(持有listB
的锁) 调用taskEntry.remove()
。remove()
方法会去调用listA.remove(taskEntry)
。listA.remove
方法是synchronized
的,所以它会尝试获取listA
的锁。但是,listA
的锁已经被Thread2
持有。所以Thread1
开始等待listA
的锁。Thread2
(持有listA
的锁) 调用taskEntry.remove()
。假设它需要从listB
中移除(或者说,它的逻辑需要和listB
交互),它会尝试获取listB
的锁。但是,listB
的锁已经被Thread1
持有。所以Thread2
开始等待listB
的锁。
现在,情况是:
Thread1
持有listB
的锁,等待listA
的锁。Thread2
持有listA
的锁,等待listB
的锁。
这就是一个经典的死锁。两个线程都在互相等待对方释放自己需要的资源,谁也无法继续执行,程序被永久挂起。
正确的设计如何避免死锁
现在我们来看正确实现:
// ... existing code ...
public void add(TimerTaskEntry timerTaskEntry) {
boolean done = false;
while (!done) {
// Remove the timer task entry if it is already in any other list
// We do this outside of the sync block below to avoid deadlocking.
// We may retry until timerTaskEntry.list becomes null.
timerTaskEntry.remove();
synchronized (this) {
synchronized (timerTaskEntry) {
// ... existing code ...
通过将 timerTaskEntry.remove()
移到 synchronized (this)
外部,打破了死锁的循环等待条件。
在 add
方法中,线程不会在持有任何 TimerTaskList
锁的情况下去尝试获取另一个 TimerTaskList
的锁。remove()
操作本身是独立的,它会获取它需要操作的 list
的锁,但调用 add
的线程此刻并没有持有任何 list
的锁。
remove()
方法的实现如下:
// TimerTaskEntry.java
public void remove() {
TimerTaskList currentList = list;
while (currentList != null) {
currentList.remove(this); // 这个 remove 是 TimerTaskList 的方法
currentList = list;
}
}
// TimerTaskList.java
public synchronized void remove(TimerTaskEntry timerTaskEntry) {
// ...
}
TimerTaskEntry.remove()
会调用 TimerTaskList.remove()
,后者会锁住 list
。但是,调用 add
的线程在调用 timerTaskEntry.remove()
时,并没有持有任何 list
的锁,所以不会构成循环等待。
简而言之,这种设计遵循了一个重要的并发编程原则:如果要获取多个锁,请始终保证以相同的顺序获取它们。在这里,通过将 remove
操作分离出来,避免了在一个线程持有 lockA
的同时去请求 lockB
,而另一个线程持有 lockB
去请求 lockA
的情况。
remove(TimerTaskEntry timerTaskEntry)
从链表中移除一个任务条目。
// ... existing code ...
public synchronized void remove(TimerTaskEntry timerTaskEntry) {
synchronized (timerTaskEntry) {
if (timerTaskEntry.list == this) {
timerTaskEntry.next.prev = timerTaskEntry.prev;
timerTaskEntry.prev.next = timerTaskEntry.next;
timerTaskEntry.next = null;
timerTaskEntry.prev = null;
timerTaskEntry.list = null;
taskCounter.decrementAndGet();
}
}
}
// ... existing code ...
逻辑很清晰:如果 timerTaskEntry
确实属于当前链表,就执行标准的双向链表删除操作,并更新 taskCounter
。
flush(Consumer<TimerTaskEntry> f)
清空整个链表,并对每个被移除的任务条目执行给定的操作 f
。
// ... existing code ...
public synchronized void flush(Consumer<TimerTaskEntry> f) {
TimerTaskEntry head = root.next;
while (head != root) {
remove(head);
f.accept(head);
head = root.next;
}
expiration.set(-1L);
}
// ... existing code ...
当一个 TimerTaskList
(桶)到期后,TimingWheel
会调用此方法。f
这个 Consumer
的逻辑通常是将任务重新插入到时间轮的下一层(如果存在overflowWheel
)或者直接提交给线程池执行。
Delayed
接口的实现
// ... existing code ...
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(Math.max(getExpiration() - time.hiResClockMs(), 0), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
TimerTaskList other = (TimerTaskList) o;
return Long.compare(getExpiration(), other.getExpiration());
}
// ... existing code ...
getDelay(...)
: 计算当前时间距离本桶的过期时间expiration
还有多久。DelayQueue
正是利用这个方法来判断一个元素是否到期。compareTo(...)
: 比较两个TimerTaskList
的过期时间。DelayQueue
内部(通常是PriorityQueue
)用它来对所有桶进行排序,确保过期时间最早的桶排在最前面。
总结
TimerTaskList
是 Kafka 时间轮定时器的一个高度优化的内部组件。它巧妙地结合了多种技术:
Delayed
接口: 使其能够被DelayQueue
管理,实现了高效的到期检查,避免了工作线程的空轮询。- 双向循环链表: 提供了 O(1) 复杂度的任务添加和删除操作。
- 哨兵节点: 简化了链表操作的实现,避免了边界检查。
- 精细的并发控制: 通过
synchronized
和Atomic
变量,确保了在多线程环境下的数据一致性和线程安全,并巧妙地避免了死锁。
它作为时间轮上的“桶”,高效地组织和管理着成百上千的定时任务,是 Kafka 实现高性能、低延迟定时任务调度的基石。
TimingWheel
(时间轮)
TimingWheel
是一种高效的、用于实现大量定时任务调度的算法结构。相比于传统的基于优先队列(PriorityQueue
)的定时器(其添加/删除操作的时间复杂度为 O(log n)),时间轮可以实现近乎 O(1) 的添加和删除操作,这在需要管理成千上万个定时任务的场景下(例如 Kafka 中的请求超时、延迟操作等)具有巨大的性能优势。
你可以把一个 TimingWheel
想象成一个时钟的表盘。
tickMs
(滴答间隔): 相当于时钟秒针跳动一下的时间间隔,是时间轮的最小精度单位。wheelSize
(轮盘大小): 相当于时钟表盘上的刻度数(比如 60 个刻度)。interval
(一圈的时间):tickMs * wheelSize
,相当于秒针走完一圈的总时间。buckets
(桶): 表盘上的每一个刻度都是一个“桶”(TimerTaskList
),用来存放应该在该刻度时间点到期的所有任务。currentTimeMs
(当前时间指针): 相当于时钟上指向当前时间的指针。它不是实时更新的,而是以tickMs
为单位向前推进。
当一个定时任务(TimerTask
)被添加进来时,我们会计算它的到期时间应该落在哪个桶里,然后把它放进去。时间轮的指针会随着时间的流逝(通过 advanceClock
方法驱动)一格一格地向前走。当指针指向某个桶时,就意味着这个桶里的所有任务都到期了,需要被执行。
分层时间轮 (Hierarchical Timing Wheels)
一个简单的时间轮只能处理 interval
时间范围内的任务。如果一个任务的延迟时间非常长,超出了当前轮盘一圈所能表示的时间范围,怎么办?这就是分层时间轮设计的用武之地。
// ... existing code ...
// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
private volatile TimingWheel overflowWheel = null;
// ... existing code ...
overflowWheel
: 每个时间轮都有一个指向更高层级时间轮的引用,称为overflowWheel
(上层时间轮)。- 层级关系: 如果当前时间轮(第一层)的
tickMs
是 1ms,wheelSize
是 20,那么它能表示的时间范围就是 20ms。它的overflowWheel
(第二层)的tickMs
就会是1ms * 20 = 20ms
。第二层的overflowWheel
(第三层)的tickMs
就是20ms * 20 = 400ms
,以此类推。 - 任务降级: 当一个非常长延时的任务(比如 500ms 后到期)被添加到第一层时间轮时,第一层发现处理不了(超出了 20ms 的范围),就会把它“扔”给它的
overflowWheel
(第二层)。第二层发现也处理不了(超出了 400ms 的范围),就继续“扔”给第三层。第三层发现 500ms 在自己的处理范围内,就将任务放入相应的桶中。 - 任务重插 (Re-insert): 当高层时间轮的指针转动时,比如第三层时间轮中一个存放
[400ms, 800ms)
任务的桶到期了,它并不会直接执行里面的任务。而是会把这个桶里的所有任务重新添加回第一层时间轮。这时,这些任务的剩余延迟时间已经变短了,第一层或第二层时间轮就能处理它们了。这个过程就像把一个粗略的时间范围不断细化,最终在最精确的那一层被执行。
核心成员变量
// ... existing code ...
public class TimingWheel {
private final long tickMs;
private final int wheelSize;
private final AtomicInteger taskCounter;
private final DelayQueue<TimerTaskList> queue;
private final long interval;
private final TimerTaskList[] buckets;
private long currentTimeMs;
// ...
private volatile TimingWheel overflowWheel = null;
// ... existing code ...
tickMs
,wheelSize
,interval
,overflowWheel
: 如上所述,定义了时间轮的基本属性和层级关系。taskCounter
: 一个全局共享的原子计数器,用于统计所有层级的时间轮中总共有多少任务。queue
: 一个java.util.concurrent.DelayQueue
。这是整个时间轮系统驱动的核心。每个桶 (TimerTaskList
) 都是一个Delayed
元素。当一个桶被设置了过期时间后,它会被放入这个DelayQueue
。只有一个后台线程需要从这个队列里取元素,delayQueue.poll()
会一直阻塞,直到有一个桶到期。这种方式远比循环检查所有任务是否到期要高效得多。buckets
:TimerTaskList
数组,时间轮的核心数据结构,代表了轮盘上的所有桶。currentTimeMs
: 时间轮的当前时间指针,以tickMs
为单位对齐。
add(TimerTaskEntry timerTaskEntry)
这是添加任务的入口。
// ... existing code ...
public boolean add(TimerTaskEntry timerTaskEntry) {
long expiration = timerTaskEntry.expirationMs;
if (timerTaskEntry.cancelled()) {
// ... 已取消
return false;
} else if (expiration < currentTimeMs + tickMs) {
// ... 已过期
return false;
} else if (expiration < currentTimeMs + interval) {
// 1. 任务在当前轮的处理范围内
long virtualId = expiration / tickMs;
int bucketId = (int) (virtualId % (long) wheelSize);
TimerTaskList bucket = buckets[bucketId];
bucket.add(timerTaskEntry);
// 2. 设置桶的过期时间,如果成功(意味着这是个新到期时间的桶),则加入 DelayQueue
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket);
}
return true;
} else {
// 3. 超出当前轮范围,交给上层 overflowWheel 处理
if (overflowWheel == null) addOverflowWheel();
return overflowWheel.add(timerTaskEntry);
}
}
// ... existing code ...
这个方法的逻辑非常清晰:
- 检查任务是否已取消或已过期。
- 如果任务的到期时间在当前时间轮的一圈 (
interval
) 之内,就计算它应该属于哪个桶 (bucketId
),然后将任务添加到该桶中。 bucket.setExpiration()
是一个关键调用。它会设置这个桶的整体过期时间。如果这个桶的过期时间被更新了(比如之前是空的,或者过期时间已经过了被重用),该方法返回true
,然后这个桶就会被放入DelayQueue
等待被调度。- 如果任务的到期时间超出了当前轮的范围,就递归地调用
overflowWheel.add()
,把任务交给上层处理。如果上层时间轮还不存在,就通过addOverflowWheel()
按需创建。
addOverflowWheel()
// ... existing code ...
private synchronized void addOverflowWheel() {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
interval,
wheelSize,
currentTimeMs,
taskCounter,
queue
);
}
}
// ... existing code ...
这是一个使用双重检查锁定模式(Double-Checked Locking)按需创建上层时间轮的方法。注意 overflowWheel
成员变量被声明为 volatile
,这是为了保证在多线程环境下该模式的正确性。
这里的关键在于第一个参数 interval
。我们来看一下 interval
是如何定义的:
// ... existing code ...
TimingWheel(
long tickMs,
int wheelSize,
long startMs,
AtomicInteger taskCounter,
DelayQueue<TimerTaskList> queue
) {
// ... existing code ...
this.interval = tickMs * wheelSize;
// ... existing code ...
}
// ... existing code ...
在当前时间轮的构造函数中,this.interval
被计算为 tickMs * wheelSize
。
当我们创建 overflowWheel
时,我们把当前轮的 interval
作为下一轮的 tickMs
传递了进去。
所以,层级关系是这样的:
当前轮 (level N):
tickMs
=T
interval
=T * wheelSize
上一层轮 (level N+1, 即
overflowWheel
):- 它的
tickMs
被设置为当前轮的interval
,也就是T * wheelSize
。 - 它的
interval
将会是(T * wheelSize) * wheelSize
,即T * wheelSize^2
。
- 它的
因此,间隔确实变长了。overflowWheel
的时间粒度(tickMs
)是当前轮的 interval
,这正是分层时间轮实现大跨度时间范围的核心机制。每一层都比下一层的时间粒度粗一个 wheelSize
的数量级。
advanceClock(long timeMs)
这个方法用于推动时间轮的指针前进。它通常由外部的调度线程在从 DelayQueue
中取出一个到期的桶后调用。
// ... existing code ...
public void advanceClock(long timeMs) {
if (timeMs >= currentTimeMs + tickMs) {
currentTimeMs = timeMs - (timeMs % tickMs);
// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs);
}
}
}
它会把当前时间轮的 currentTimeMs
更新到 timeMs
(并向下对齐到 tickMs
的整数倍),然后递归地调用上层时间轮的 advanceClock
,以确保所有层级的时间指针保持同步。
TimingWheel
如何被使用和消费?
TimingWheel
本身只负责组织和存放定时任务,它像一个数据结构,并不主动消费任务。“具体怎么使用和消费”是由一个外部的驱动者来完成的,在 Kafka 的实现中,这个驱动者通常是 SystemTimer
类。
整个流程是这样的:
任务添加:
- 外部调用
SystemTimer.add(timerTask)
。 SystemTimer
会把任务包装成TimerTaskEntry
,并调用TimingWheel.add()
将其放入时间轮的某个桶 (TimerTaskList
) 中。- 如果这个桶是新启用的,它会被
offer
到一个所有TimingWheel
实例共享的DelayQueue
中。
- 外部调用
任务消费(驱动核心):
SystemTimer
内部有一个单独的工作线程(通常叫做reaper
线程,即收割者线程)。- 这个线程的核心逻辑就是一个循环,它会阻塞式地从
DelayQueue
中获取到期的元素:delayQueue.poll(timeout, ...)
。 poll()
只会在最近一个将要到期的桶 (TimerTaskList
) 到达其过期时间时才会返回。这极大地提升了效率,避免了空轮询。
时钟推进和任务处理:
- 一旦
poll()
返回了一个到期的bucket
(TimerTaskList
),工作线程就会被唤醒。 - 然后,
SystemTimer
会调用TimingWheel.advanceClock(bucket.getExpiration())
,将时间轮的指针推进到这个桶的过期时间。 - 接着,调用
bucket.flush(this::addTimerTaskEntry)
。这个方法会遍历桶里所有的任务:- 对于真正到期的任务,
flush
会将它们提交给一个ExecutorService
(线程池) 去执行。 - 对于从高层时间轮降级下来的任务(它们的实际过期时间还未到),
flush
会调用addTimerTaskEntry
将它们重新插入到时间轮的底层,进行更精确的调度。
- 对于真正到期的任务,
- 工作线程会继续
poll()
,处理下一个到期的桶,如此循环往复。
- 一旦
我们可以看一下 SystemTimer.java
中的关键代码片段来印证这个流程:
// ... existing code ...
public boolean advanceClock(long timeoutMs) throws InterruptedException {
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null) {
writeLock.lock();
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration());
bucket.flush(this::addTimerTaskEntry);
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return true;
} else {
return false;
}
}
// ... existing code ...
这段代码清晰地展示了 SystemTimer
是如何驱动 TimingWheel
工作的:从 delayQueue
获取到期的桶 -> 推进时钟 -> flush
桶内任务。
TimingWheel
是一个被动的、精巧的任务存储结构。SystemTimer
是主动的驱动者,它利用一个工作线程和DelayQueue
来消费TimingWheel
中的到期任务,并驱动时间轮的时钟前进。
总结
TimingWheel
是一个设计精巧、性能卓越的定时任务调度器。
优点:
- 高性能: 添加和删除任务的时间复杂度接近 O(1)(严格来说是 O(m),m为层数,通常很小)。
- 高扩展性: 通过分层设计,可以支持任意长时间的延迟任务。
- 低资源消耗: 通过
DelayQueue
驱动,避免了CPU的空轮询。
线程安全说明:
- 如类注释中所说
This class is not thread-safe
。具体来说,add
和advanceClock
不能并发调用。调用者(如SystemTimer
)需要通过锁来保证advanceClock
和任务重插(bucket.flush
)期间,没有新的任务被add
进来。 - 但是,多个线程并发调用
add
是线程安全的。
- 如类注释中所说
TimingWheel
是 Kafka 服务端处理大量网络请求超时、延迟操作(如延迟生产/消费请求)等场景的基石,完美体现了用合适的数据结构解决特定问题的工程智慧。
SystemTimer
SystemTimer
是 Kafka 中时间轮 (TimingWheel
) 机制的驱动者和门面 (Facade)。如果说 TimingWheel
是一个精密但被动的时钟机械装置,那么 SystemTimer
就是给这个装置提供动力、发条,并对外提供统一接口的那个外壳。它实现了 Timer
接口,将底层复杂的 TimingWheel
、DelayQueue
和线程管理封装起来,为上层应用提供简单易用的定时任务服务。
SystemTimer
的核心职责有三个:
- 提供接口 (Facade): 实现
Timer
接口中的add(TimerTask)
,advanceClock(long)
,size()
和close()
方法,向上层屏蔽底层实现的复杂性。 - 管理线程: 内部维护一个
ExecutorService
(taskExecutor
) 用于执行到期的任务,以及一个隐藏的 "Reaper" 线程(收割者线程,通常在SystemTimerReaper
类中)来驱动时钟前进。 - 协调组件: 它是连接
TimingWheel
、DelayQueue
和taskExecutor
这三个核心组件的桥梁,负责它们之间的交互和线程同步。
核心成员变量
// ... existing code ...
public class SystemTimer implements Timer {
public static final String SYSTEM_TIMER_THREAD_PREFIX = "executor-";
// timeout timer
private final ExecutorService taskExecutor;
private final DelayQueue<TimerTaskList> delayQueue;
private final AtomicInteger taskCounter;
private final TimingWheel timingWheel;
// Locks used to protect data structures while ticking
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
// ... existing code ...
taskExecutor
: 一个单线程的ExecutorService
。所有到期的TimerTask
的run()
方法最终都会被提交到这个线程池中执行。这确保了任务的执行不会阻塞SystemTimer
的主驱动线程(Reaper 线程)。delayQueue
: 一个DelayQueue
实例。这是整个定时器系统的脉搏。它存放着TimingWheel
中所有非空的、设置了过期时间的任务列表 (TimerTaskList
)。驱动线程会阻塞地从这个队列中获取到期的任务列表。taskCounter
: 一个原子整数,用于记录当前定时器中待处理的任务总数。这个计数器被TimingWheel
和SystemTimer
共享。timingWheel
:SystemTimer
内部持有的TimingWheel
实例,是所有定时任务的实际存储容器。readWriteLock
: 一个读写锁。这是保证线程安全的关键。- 读锁 (
readLock
): 用于保护add(TimerTask)
操作。多个线程可以同时添加任务,因为这通常只涉及对TimingWheel
内部某个桶的链表进行操作,这是线程安全的。 - 写锁 (
writeLock
): 用于保护advanceClock(long)
操作。当驱动线程需要推进时钟并处理到期任务桶时,必须获得写锁。这会阻塞所有新的任务添加操作,防止在处理任务桶(flush
)时,有新任务被添加到同一个桶中,从而避免了数据不一致。
- 读锁 (
构造函数分析
// ... existing code ...
public SystemTimer(
String executorName,
long tickMs,
int wheelSize,
long startMs
) {
this.taskExecutor = Executors.newFixedThreadPool(1,
runnable -> KafkaThread.nonDaemon(SYSTEM_TIMER_THREAD_PREFIX + executorName, runnable));
this.delayQueue = new DelayQueue<>();
this.taskCounter = new AtomicInteger(0);
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
}
// ... existing code ...
构造函数初始化了所有核心组件:
- 创建一个名为
executor-
+executorName
的单线程线程池。 - 创建一个
DelayQueue
。 - 创建一个原子计数器。
- 创建
TimingWheel
的根实例,并将delayQueue
和taskCounter
传递给它。这样,TimingWheel
在添加任务时就可以直接操作这两个共享的组件。
add(TimerTask timerTask)
// ... existing code ...
public void add(TimerTask timerTask) {
readLock.lock();
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));
} finally {
readLock.unlock();
}
}
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled()) {
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
// ... existing code ...
这是向定时器添加新任务的入口。
- 首先获取读锁,允许多个线程并发添加。
- 计算任务的绝对到期时间 (
timerTask.delayMs + Time.SYSTEM.hiResClockMs()
),并创建一个TimerTaskEntry
。 - 调用私有的
addTimerTaskEntry
方法。 addTimerTaskEntry
尝试将任务添加到timingWheel
。- 如果
timingWheel.add()
返回false
,说明这个任务已经过期或者被取消了。对于未取消的已过期任务,直接提交到taskExecutor
立即执行。
advanceClock(long timeoutMs)
// ... existing code ...
public boolean advanceClock(long timeoutMs) throws InterruptedException {
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if (bucket != null) {
writeLock.lock();
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration());
bucket.flush(this::addTimerTaskEntry);
bucket = delayQueue.poll();
}
} finally {
writeLock.unlock();
}
return true;
} else {
return false;
}
}
// ... existing code ...
这是驱动时间轮前进的核心方法,通常由一个独立的 "Reaper" 线程循环调用。
- 调用
delayQueue.poll(timeoutMs, ...)
阻塞等待,直到有一个任务桶 (TimerTaskList
) 到期,或者等待超时。 - 如果成功获取到一个到期的
bucket
:- 立即获取写锁。这会阻塞所有
add
操作,确保时钟推进和任务处理的原子性。 - 进入一个
while
循环,处理所有已经到期的桶。 timingWheel.advanceClock(bucket.getExpiration())
: 将时间轮的内部时钟指针拨到当前到期桶的时间。bucket.flush(this::addTimerTaskEntry)
: 这是最关键的一步。flush
会清空这个桶,遍历其中的每一个TimerTaskEntry
。对于需要重新插入的(来自高层时间轮降级的任务),它会回调this::addTimerTaskEntry
方法,将任务重新添加到时间轮中。对于真正到期的任务,TimerTaskEntry
的run()
方法会被调用(通过taskExecutor
)。bucket = delayQueue.poll()
: 非阻塞地尝试获取下一个可能也已经到期的桶,继续循环处理。 f. 最后,释放写锁。
- 立即获取写锁。这会阻塞所有
总结
SystemTimer
是一个设计优雅的定时器实现。它通过组合 TimingWheel
、DelayQueue
和 ExecutorService
,并利用读写锁进行精细的并发控制,实现了以下目标:
- 高性能: 添加任务快(读锁),处理到期任务也高效(由
DelayQueue
驱动,无空转)。 - 线程安全: 通过读写锁分离了“添加任务”和“处理任务”两个场景,提高了并发度,同时保证了数据一致性。
- 易于使用: 封装了所有底层细节,提供了一个简单的
Timer
接口。
在 Kafka 中,像请求超时管理器 (RequestPurgatory
) 等需要大量、高效率定时任务的组件,都会依赖 SystemTimer
来提供服务。
ShutdownableThread
ShutdownableThread
是 Kafka 项目中一个非常基础且重要的工具类。它提供了一个标准的、可控的后台线程生命周期管理框架。在复杂的分布式系统中,有大量的后台线程需要执行周期性任务,并且在系统关闭时能够被优雅地停止。ShutdownableThread
正是为了解决这个问题而设计的,它封装了线程的启动、运行、优雅关闭和状态查询等通用逻辑。
ShutdownableThread
的核心设计目标是提供一个可预测、可管理的线程模板。它基于模板方法设计模式,定义了线程生命周期的骨架,并将具体的业务逻辑延迟到子类中去实现。
其核心思想是:
- 定义生命周期: 明确线程的几个关键状态:未启动、运行中、已发起关闭、已完成关闭。
- 封装关闭逻辑: 提供标准的
shutdown()
、initiateShutdown()
和awaitShutdown()
方法,使关闭操作变得简单和统一。 - 抽象业务逻辑: 将线程需要循环执行的具体工作抽象为
doWork()
方法,由子类根据业务需求填充。 - 统一异常处理: 在
run()
方法中提供了统一的try-catch-finally
结构,处理运行时可能出现的各种异常,并确保线程最终能够正确地结束和清理。
核心成员变量与状态管理
// ... existing code ...
public abstract class ShutdownableThread extends Thread {
public final String logPrefix;
protected final Logger log;
private final boolean isInterruptible;
private final CountDownLatch shutdownInitiated = new CountDownLatch(1);
private final CountDownLatch shutdownComplete = new CountDownLatch(1);
private volatile boolean isStarted = false;
// ... existing code ...
isInterruptible
: 一个布尔值,决定了在发起关闭时,是否要调用interrupt()
方法来中断线程。对于那些会响应中断异常(InterruptedException
)的阻塞操作(如sleep
,wait
,poll
),将此值设为true
可以让它们更快地从阻塞中唤醒并响应关闭信号。shutdownInitiated
: 一个CountDownLatch
,初始计数为1。当initiateShutdown()
被调用时,它的countDown()
会被触发,计数值变为0。这就像一个单向开关,一旦触发,就标志着关闭流程已经开始。isRunning()
方法就是通过检查这个latch来判断线程是否应该继续运行。shutdownComplete
: 另一个CountDownLatch
,初始计数也为1。它在run()
方法的finally
块中被触发countDown()
。这标志着线程的run
方法已经执行完毕,线程已经彻底停止。awaitShutdown()
方法就是通过等待这个latch来阻塞调用者,直到线程完全关闭。isStarted
: 一个volatile
布尔值,用于标记run()
方法是否已经开始执行。
通过这两个 CountDownLatch
,ShutdownableThread
精确地定义和追踪了线程的生命周期状态。
run()
// ... existing code ...
public void run() {
isStarted = true;
log.info("Starting");
try {
while (isRunning())
doWork();
} catch (FatalExitError e) {
shutdownInitiated.countDown();
shutdownComplete.countDown();
log.error("Stopped due to fatal error with exit code {}", e.statusCode(), e);
Exit.exit(e.statusCode());
} catch (Throwable e) {
if (isRunning())
log.error("Error due to", e);
} finally {
shutdownComplete.countDown();
}
log.info("Stopped");
}
public boolean isRunning() {
return !isShutdownInitiated();
}
// ... existing code ...
这是模板方法模式的核心。
- 启动标记: 首先设置
isStarted = true
。 - 主循环: 进入一个
while (isRunning())
循环。isRunning()
的实现是!isShutdownInitiated()
,即只要关闭流程没有被启动,循环就会一直继续。 - 调用抽象方法: 在循环体内,反复调用
doWork()
。具体的业务逻辑由子类在这个方法中实现。 - 异常处理:
FatalExitError
: 捕获这种特殊的致命错误。一旦捕获,会立即触发两个latch,记录错误日志,并调用Exit.exit()
强制退出整个JVM进程。这是处理无法恢复的严重错误的机制。Throwable
: 捕获所有其他类型的异常。如果线程仍在“运行”状态(即不是在关闭过程中发生的异常),就记录错误日志。这可以防止意外异常导致线程悄无声息地死掉。
- 确保关闭:
finally
块保证了无论run
方法是正常退出还是异常终止,shutdownComplete
这个latch最终都会被countDown()
。这确保了调用awaitShutdown()
的线程不会被永久阻塞。
shutdown()
- 优雅关闭的入口
// ... existing code ...
public void shutdown() throws InterruptedException {
initiateShutdown();
awaitShutdown();
}
public boolean initiateShutdown() {
synchronized (this) {
if (isRunning()) {
log.info("Shutting down");
shutdownInitiated.countDown();
if (isInterruptible)
interrupt();
return true;
} else
return false;
}
}
public void awaitShutdown() throws InterruptedException {
if (!isShutdownInitiated())
throw new IllegalStateException("initiateShutdown() was not called before awaitShutdown()");
else {
if (isStarted)
shutdownComplete.await();
log.info("Shutdown completed");
}
}
// ... existing code ...
shutdown()
方法是一个便捷的组合调用,它封装了标准的“两阶段关闭”模式:
initiateShutdown()
(发起关闭): 这是关闭流程的第一步。它会触发shutdownInitiated
latch,使得isRunning()
返回false
,从而让run()
方法中的while
循环在下一次检查时退出。如果线程被设置为isInterruptible
,它还会调用interrupt()
来唤醒可能处于阻塞状态的doWork()
方法。awaitShutdown()
(等待关闭完成): 这是关闭流程的第二步。调用此方法的线程会阻塞在shutdownComplete.await()
上,直到run()
方法执行完毕(即finally
块被执行),shutdownComplete
被触发。
要使用 ShutdownableThread
,开发者需要:
- 创建一个类继承自
ShutdownableThread
。 - 实现抽象的
doWork()
方法,在其中放入需要循环执行的业务逻辑。 - 在
doWork()
中,可以包含一些阻塞操作(如从队列取数据、sleep
等)。如果这些操作响应中断,那么在创建实例时将isInterruptible
设为true
会让关闭更迅速。 - 在需要启动线程的地方,创建子类实例并调用
start()
。 - 在需要关闭线程的地方,调用
shutdown()
方法。
例如,SystemTimer
的内部 "Reaper" 线程就是一个典型的例子:
class Reaper extends ShutdownableThread {
// ...
@Override
public void doWork() {
try {
// advanceClock会阻塞在delayQueue.poll上
timer.advanceClock(200L);
} catch (InterruptedException e) {
// ...
}
}
}
这个 Reaper
线程的 doWork
就是不断地调用 timer.advanceClock
,而 advanceClock
内部会阻塞等待,ShutdownableThread
框架则负责了它的整个生命周期管理。
总结
ShutdownableThread
是 Kafka 中一个非常优秀的基础设施类。它通过模板方法模式和 CountDownLatch
,为开发者提供了一个健壮、可控、可重用的后台线程实现范式,极大地简化了后台服务的开发和管理,提高了系统的稳定性和可维护性。
SystemTimerReaper
SystemTimerReaper
是一个典型的装饰器 (Decorator) 模式的应用。它的主要作用是为一个已有的 Timer
实例(通常是 SystemTimer
)增加一个自动推进时钟的后台线程。
我们之前分析过,SystemTimer
和 TimingWheel
本身是被动的,它们需要外部调用 advanceClock()
方法来驱动时间轮前进和处理到期任务。SystemTimerReaper
就是这个“外部调用者”的封装,它将一个手动的 Timer
变成一个自动运行的、有生命力的定时器服务。
- 装饰器: 它实现了
Timer
接口,从外部看,它和一个普通的Timer
没什么区别。它持有一个真正的Timer
对象 (timer
),并将add
,size
等核心方法直接委托给这个被包装的对象。 - 驱动者: 它的核心价值在于内部创建并管理一个名为
Reaper
的后台线程。这个线程的唯一职责就是周期性地调用被包装的timer
对象的advanceClock()
方法。 - 生命周期管理者: 它负责
Reaper
线程的启动和优雅关闭。
Reaper
内部类
// ... existing code ...
class Reaper extends ShutdownableThread {
Reaper(String name) {
super(name, false);
}
@Override
public void doWork() {
try {
timer.advanceClock(WORK_TIMEOUT_MS);
} catch (InterruptedException ex) {
// Ignore.
}
}
}
// ... existing code ...
这是 SystemTimerReaper
的心脏。
- 它继承了我们之前分析过的
ShutdownableThread
,从而天然具备了优雅启动和关闭的能力。 - 在构造函数中,
isInterruptible
被设置为false
。这意味着在关闭时,不会主动中断该线程。 doWork()
方法是它的核心逻辑:在一个循环中(由ShutdownableThread
的run()
方法提供),不断调用timer.advanceClock(WORK_TIMEOUT_MS)
。WORK_TIMEOUT_MS
(200L) 是advanceClock
的超时时间。这意味着Reaper
线程会阻塞在delayQueue.poll()
上,最多等待 200ms。如果有任务在这 200ms 内到期,它会立即被唤醒并处理;如果 200ms 内没有任何任务到期,它也会醒来一次,然后进入下一个循环,继续等待。这个超时机制确保了即使没有任务,时钟也能以最大 200ms 的延迟向前推进。
构造函数
// ... existing code ...
private final Timer timer;
private final Reaper reaper;
public SystemTimerReaper(String reaperThreadName, Timer timer) {
this.timer = timer;
this.reaper = new Reaper(reaperThreadName);
this.reaper.start();
}
// ... existing code ...
构造函数非常直接:
- 接收一个要被包装的
Timer
实例和一个线程名。 - 创建一个
Reaper
线程实例。 - 立即启动
Reaper
线程。这意味着一旦SystemTimerReaper
被创建,后台的时钟推进就开始自动工作了。
接口方法的委托
// ... existing code ...
@Override
public void add(TimerTask timerTask) {
timer.add(timerTask);
}
@Override
public boolean advanceClock(long timeoutMs) throws InterruptedException {
return timer.advanceClock(timeoutMs);
}
@Override
public int size() {
return timer.size();
}
// ... existing code ...
这几个方法完美体现了装饰器模式。SystemTimerReaper
自身不处理这些逻辑,而是简单地将调用转发给内部持有的 timer
对象。
close()
- 优雅关闭
// ... existing code ...
@Override
public void close() throws Exception {
reaper.initiateShutdown();
// Improve shutdown time by waking up the reaper thread
// blocked on poll by sending a no-op.
timer.add(new TimerTask(0) {
@Override
public void run() {}
});
reaper.awaitShutdown();
timer.close();
}
// ... existing code ...
close()
方法的实现非常精巧,展示了如何优雅地关闭一个可能处于阻塞状态的后台线程:
reaper.initiateShutdown()
: 调用ShutdownableThread
的方法,发起关闭流程。这会让Reaper
线程的isRunning()
返回false
,使其在下一次循环检查时退出。timer.add(new TimerTask(0) { ... })
: 这是关键的唤醒技巧。Reaper
线程此时可能正阻塞在timer.advanceClock(200)
,即delayQueue.poll(200)
上。为了不让它傻等 200ms,这里添加了一个延迟为 0 的空任务 (TimerTask(0)
)。这个任务会立即被放入delayQueue
,从而立刻唤醒正在poll()
的Reaper
线程。- 被唤醒的
Reaper
线程会处理这个空任务,然后检查while(isRunning())
条件,发现为false
,于是退出循环,run()
方法结束。 reaper.awaitShutdown()
: 主线程在这里等待,直到Reaper
线程完全终止。timer.close()
: 最后,关闭被包装的timer
对象,释放其内部资源(如taskExecutor
线程池)。
总结
SystemTimerReaper
是一个简单而强大的类,它通过装饰器模式和 ShutdownableThread
框架,将一个被动的、需要手动驱动的 SystemTimer
包装成了一个全自动、有完整生命周期管理的定时器服务。
- 解耦: 它将“任务的存储和执行逻辑” (
SystemTimer
) 与“驱动时钟前进的策略” (Reaper
线程)清晰地分离开来。 - 易用性: 用户只需要创建一个
SystemTimerReaper
实例,就可以得到一个开箱即用的、自动运行的定时器,无需关心底层的线程管理和时钟推进细节。 - 健壮性: 提供了优雅的关闭逻辑,确保后台线程能够被快速、干净地停止,避免了资源泄露。
在 Kafka 的代码中,当需要一个“活”的、自动运行的定时器时(例如用于管理延迟操作的 DelayedOperationPurgatory
),通常就会使用 SystemTimerReaper
(或者直接使用 SystemTimer
并自己管理一个类似的Reaper线程)来构建服务。
Kafka 的时间轮 (TimingWheel
) 在工程中的具体应用
Kafka 的时间轮是其内部一个高性能、低开销的定时任务调度器。任何需要在未来某个时间点执行一个操作,或者需要为某个事件设置超时时间的场景,都可能使用到它。由于 Kafka 是一个高并发、高吞吐的系统,需要同时管理成千上万个定时事件(比如客户端请求的超时),使用传统 java.util.Timer
或 ScheduledThreadPoolExecutor
会因为锁竞争或任务管理开销巨大而成为性能瓶颈。时间轮 O(1)
的任务添加和删除(指移动指针)复杂度,使其成为理想的解决方案。
以下是 Kafka 中使用时间轮最核心和最典型的几个地方:
请求超时管理 (Request Purgatory - 请求炼狱)
这是时间轮最经典、最重要的应用场景。在 Kafka Broker 中,很多客户端请求并不能立即得到满足,需要被延迟处理 (Delayed),直到某个条件满足或者超时。这些被延迟的请求就被放入一个叫做 "Purgatory" (炼狱) 的组件中进行管理。DelayedOperationPurgatory
类就是这个机制的核心实现,它内部就依赖于 SystemTimer
(也就是时间轮)。
典型的延迟操作包括:
DelayedProduce
(延迟生产): 当 Producer 设置acks=all
或acks=-1
时,Broker 在收到消息后,必须等待所有 ISR (In-Sync Replicas) 都同步完成这条消息后才能响应 Producer。这个等待过程就是一个延迟操作。如果等待时间超过了request.timeout.ms
,则请求超时,时间轮会触发这个DelayedProduce
操作,使其强制完成并给客户端返回一个超时错误。DelayedFetch
(延迟拉取): Consumer 拉取消息时可以设置fetch.min.bytes
和fetch.max.wait.ms
。如果 Broker 当前的数据不满足fetch.min.bytes
,它不会立即返回空数据,而是会将这个 Fetch 请求延迟,直到有足够的数据或者等待时间超过了fetch.max.wait.ms
。这个等待过程就是由时间轮来管理的。DelayedDeleteRecords
(延迟删除记录): 删除记录的请求也需要等待操作传播到 high watermark 之后才能完成,这个等待也是一个延迟操作。DelayedJoin
和DelayedSyncGroup
: 在 Consumer Group Rebalance 过程中,协调者(Coordinator)等待所有成员加入 (JoinGroup
) 或同步状态 (SyncGroup
) 的过程也是延迟操作,有各自的超时时间,由时间轮管理。
工作流程:
- 一个不能立即满足的请求(如 Produce 或 Fetch)被封装成一个
DelayedOperation
对象。 - 这个对象被提交到
DelayedOperationPurgatory
。 - Purgatory 使用
SystemTimer.add(operation)
将这个操作作为一个TimerTask
添加到时间轮中,超时时间就是请求的超时时间。 - 如果在超时之前,操作的完成条件被满足(例如,ISR 同步完成),则操作会主动执行,并调用
cancel()
方法将自己从时间轮中移除。 - 如果操作的条件一直未满足,时间轮的指针最终会扫到这个任务,触发其
run()
方法,强制完成该操作(通常是返回一个超时响应给客户端)。
消费者组心跳和会话超时 (Session Timeout)
在消费者组中,每个成员都需要定期向组协调者(Group Coordinator)发送心跳,以表明自己还活着。协调者需要为每个消费者维护一个会话(Session)。
- 协调者会为每个消费者的会话启动一个超时定时器。这个定时器的时长就是
session.timeout.ms
。 - 每当协调者收到一个消费者的心跳时,它就会重置(可以理解为取消旧的,添加一个新的)这个消费者对应的会话超时定时器。
- 如果有某个消费者的定时器到期了(意味着协调者在
session.timeout.ms
内没有收到它的心跳),协调者就会认为这个消费者已经宕机,将其从消费组中移除,并触发一次 Rebalance。
在一个大型的 Kafka 集群中,协调者可能需要同时管理成千上万个消费者的会话,使用时间轮来管理这些海量的会话超时定时器,效率极高。
事务超时管理 (Transaction Timeout)
Kafka 支持事务(Transactions)。当 Producer 开启一个事务后,它必须在 transaction.timeout.ms
配置的时间内完成这个事务(提交或中止)。
- 事务协调者(Transaction Coordinator)在收到
InitProducerId
或AddPartitionsToTxn
请求后,会为这个事务启动一个超时定时器。 - 如果在这个时间内,事务没有被提交或中止,时间轮会触发这个定时器。
- 事务协调者会主动发起中止事务的操作,确保事务不会无限期地挂起,从而避免锁住资源。
总结
总而言之,Kafka 的时间轮 (TimingWheel
) 是其高性能调度系统的基石,主要用在以下场景:
场景 | 具体应用 | 为什么使用时间轮 |
---|---|---|
请求处理 | DelayedOperationPurgatory 管理各种延迟请求,如 Produce, Fetch, JoinGroup 等。 | Broker 需要同时挂起成千上万个客户端请求,时间轮能以极低开销管理这些请求的超时。 |
成员管理 | Group Coordinator 管理消费者的会话超时。 | 需要为海量消费者维护心跳计时器,判断其死活。 |
事务管理 | Transaction Coordinator 管理事务的超时。 | 确保事务不会无限期挂起,自动中止超时的事务。 |
通过将这些大量的、生命周期短暂的定时任务交给时间轮管理,Kafka 避免了为每个任务创建一个线程或使用重量级调度器所带来的巨大性能开销,这是其能够支撑海量客户端并保持高吞吐和低延迟的关键设计之一。
Time
接口
Time
接口在 Kafka 中扮演着一个非常基础且关键的角色:时间的抽象。它的核心目的是将 Kafka 内部所有依赖于“时间”的代码与具体的时钟实现解耦。这带来了两个巨大的好处:
- 可测试性 (Testability): 这是最重要的目的。在生产环境中,代码使用
System.currentTimeMillis()
或System.nanoTime()
来获取真实时间。但在单元测试或集成测试中,我们经常需要控制时间的流逝,比如模拟超时、验证延迟任务是否在预期时间执行等。通过Time
接口,我们可以在测试时注入一个MockTime
的实现,从而可以随心所欲地“快进”时间,而不需要真的Thread.sleep()
来等待,极大地提高了测试的效率和确定性。 - 统一性 (Uniformity): 它为整个 Kafka 项目提供了一个统一的获取时间的方式。所有需要时间的代码都应该通过这个接口,而不是各自去调用
System.currentTimeMillis()
,这使得代码更加规范和易于维护。
接口定义和语义
/**
* An interface abstracting the clock to use in unit testing classes that make use of clock time.
*
* Implementations of this class should be thread-safe.
*/
public interface Time {
// ...
}
接口的 Javadoc 明确指出了其核心目的:“一个抽象时钟的接口,用于在单元测试中使用时钟时间的类”。同时,它要求所有实现都必须是线程安全的。
静态字段
// ... existing code ...
public interface Time {
Time SYSTEM = SystemTime.getSystemTime();
// ... existing code ...
Time SYSTEM = SystemTime.getSystemTime()
:- 语义: 这提供了一个默认的、随时可用的
Time
实例,它代表了真实的系统时间。 - 实现:
SystemTime
是Time
接口的一个实现,其内部方法直接委托给System.currentTimeMillis()
和System.nanoTime()
。 - 用法: 在生产代码中,当不需要模拟时间时,通常会直接使用
Time.SYSTEM
。例如long now = Time.SYSTEM.milliseconds();
。
- 语义: 这提供了一个默认的、随时可用的
核心抽象方法
这些方法定义了 Time
接口的基本能力,必须由实现类提供具体逻辑。
long milliseconds()
:- 语义: 返回当前的“墙上时钟时间”(wall-clock time),单位是毫秒。这个时间通常等同于自 Unix 纪元(1970-01-01T00:00:00Z)以来的毫秒数。
- 实现:
SystemTime
中对应System.currentTimeMillis()
;MockTime
中对应一个可手动控制的内部变量。
long nanoseconds()
:- 语义: 返回 JVM 的高精度时间源的当前值,单位是纳秒。这个值只用于测量时间间隔,它的绝对值没有意义,甚至可能是负数。它与任何系统或墙上时钟时间都无关。
- 实现:
SystemTime
中对应System.nanoTime()
;MockTime
中也对应一个可手动控制的内部变量。
void sleep(long ms)
:- 语义: 使当前线程“睡眠”指定的毫秒数。
- 实现:
SystemTime
中对应Thread.sleep(ms)
;MockTime
中则通常是简单地将内部时钟推进ms
毫秒,并不会真的阻塞线程。
void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs)
:- 语义: 这是一个对
Object.wait()
的封装。它会在一个对象的监视器上等待,直到condition
变为true
或者达到了deadlineMs
(绝对时间戳)指定的超时时间。这避免了直接使用Object.wait(timeout)
时对系统时间的隐式依赖,使得等待操作也可以被MockTime
控制。 Supplier<Boolean> condition
: 一个函数,用于在被唤醒后检查等待的条件是否已满足。deadlineMs
: 超时的绝对时间点。
- 语义: 这是一个对
默认方法 (Default Methods)
Java 8 接口的默认方法为 Time
接口提供了很多便利的工具函数。
long hiResClockMs()
:- 语义: 将
nanoseconds()
返回的高精度时间转换为毫秒。主要用于需要毫秒级精度但又想利用高精度时钟源的场景。
- 语义: 将
Timer timer(long timeoutMs)
和Timer timer(Duration timeout)
:- 语义: 创建并返回一个与当前
Time
实例绑定的Timer
对象。Timer
是一个辅助类,用于方便地跟踪超时。 - 实现: 它直接
new Timer(this, timeoutMs)
,将当前的Time
实例注入到Timer
中。这样,Timer
的所有时间计算(如isExpired()
,remainingMs()
)都会使用这个Time
实例,从而也实现了可测试性。
- 语义: 创建并返回一个与当前
<T> T waitForFuture(Future<T> future, long deadlineNs)
:- 语义: 这是一个非常有用的工具方法,用于等待一个
Future
完成,但带有基于nanoseconds()
的超时控制。 - 实现逻辑:
- 在一个
while(true)
循环中不断检查。 - 首先,获取当前纳秒时间
nowNs
,如果已经超过了最后期限deadlineNs
,就抛出TimeoutException
。 - 计算剩余的等待时间
deltaNs = deadlineNs - nowNs
。 - 调用
future.get(deltaNs, TimeUnit.NANOSECONDS)
,在剩余时间内等待结果。 - 如果
future.get
正常返回,则方法成功返回结果。 - 如果
future.get
抛出TimeoutException
,则捕获它,然后循环继续。下一次循环开始时,会再次检查是否真的超过了deadlineNs
,如果还没有,会用更新后的、更短的剩余时间再次尝试future.get
。这种循环处理了future.get
可能因为其他原因(虚假唤醒等)提前返回的场景,确保了等待的健壮性。
- 在一个
- 语义: 这是一个非常有用的工具方法,用于等待一个
总结
Time
接口是 Kafka 中一个典型的依赖注入和面向接口编程的范例。它通过将时间抽象化,成功地将业务逻辑与底层时间源解耦。
- 核心语义: 提供一个统一的、可替换的时间源。
- 主要实现:
SystemTime
: 用于生产环境,使用真实的系统时钟。MockTime
: 用于测试环境,允许开发者手动控制时间的流逝。
- 设计价值: 极大地提升了代码的可测试性,使得对超时、延迟、调度等与时间相关的复杂逻辑进行单元测试变得简单可靠。同时,它也统一了项目内部获取时间的方式,提高了代码的规范性。
SystemTime
SystemTime
是 Time
接口在生产环境中的标准、具体实现。它的作用非常直接:将 Time
接口中定义的抽象时间操作(如获取毫秒、纳秒、睡眠等)桥接到 Java 虚拟机和操作系统提供的真实系统时钟上。
// ... existing code ...
/**
* A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance
* of this class.
*/
class SystemTime implements Time {
// ... existing code ...
class SystemTime implements Time
: 这表明SystemTime
提供了Time
接口所要求的所有功能的具体实现。- 包级私有 (
class SystemTime
): 注意到这个类没有public
修饰符,这意味着它只能在org.apache.kafka.common.utils
包内部被直接访问。这是一个很好的封装实践。外部代码不应该、也不需要直接创建SystemTime
的实例,而应该通过Time.SYSTEM
这个公共静态字段来获取它。Javadoc 注释也明确强调了这一点:“UseTime.SYSTEM
instead of creating an instance of this class.”
单例模式实现
// ... existing code ...
class SystemTime implements Time {
private static final SystemTime SYSTEM_TIME = new SystemTime();
public static SystemTime getSystemTime() {
return SYSTEM_TIME;
}
// ... existing code ...
private SystemTime() {
}
}
SystemTime
采用了饿汉式单例模式来实现:
private static final SystemTime SYSTEM_TIME = new SystemTime()
: 在类加载时就创建了一个唯一的、final
的静态实例。这保证了在整个 JVM 生命周期中,SYSTEM_TIME
对象只有一个。private SystemTime()
: 构造函数被声明为private
,这阻止了任何外部代码通过new SystemTime()
来创建新的实例。public static SystemTime getSystemTime()
: 提供一个公共的静态方法来获取这个唯一的实例。这个方法主要被Time
接口中的Time SYSTEM = SystemTime.getSystemTime();
这行代码使用,从而将单例暴露出去。
这种设计是合理的,因为系统时钟本身就是全局唯一的资源,为它创建一个单例对象既节省了资源,也符合其语义。
SystemTime
的核心就是对 Time
接口中方法的直接实现。
milliseconds()
和 nanoseconds()
// ... existing code ...
@Override
public long milliseconds() {
return System.currentTimeMillis();
}
@Override
public long nanoseconds() {
return System.nanoTime();
}
// ... existing code ...
milliseconds()
: 直接调用System.currentTimeMillis()
,返回标准的“墙上时钟”时间。nanoseconds()
: 直接调用System.nanoTime()
,返回高精度计时器的时间,用于测量时间间隔。
这是最直接的实现,将接口的抽象调用映射到了 Java 的原生方法。
sleep(long ms)
// ... existing code ...
@Override
public void sleep(long ms) {
Utils.sleep(ms);
}
// ... existing code ...
sleep(long ms)
: 它没有直接调用Thread.sleep(ms)
,而是委托给了同一个包下的Utils.sleep(ms)
。Utils.sleep
内部实际上就是调用了Thread.sleep
,这是一个常见的代码组织方式,将一些通用的静态工具方法集中在Utils
类中。这个方法会真实地阻塞当前线程。
waitObject(...)
// ... existing code ...
@Override
public void waitObject(Object obj, Supplier<Boolean> condition, long deadlineMs) throws InterruptedException {
synchronized (obj) {
while (true) {
if (condition.get())
return;
long currentTimeMs = milliseconds();
if (currentTimeMs >= deadlineMs)
throw new TimeoutException("Condition not satisfied before deadline");
obj.wait(deadlineMs - currentTimeMs);
}
}
}
// ... existing code ...
这是对 Object.wait(long timeout)
的一个健壮封装,用于处理虚假唤醒 (spurious wakeup) 和超时。
synchronized (obj)
: 首先获取对象的监视器锁,这是调用obj.wait()
的前提。while (true)
: 使用循环来处理虚假唤醒。Object.wait()
可能会在没有被notify()
或notifyAll()
的情况下返回,这就是虚假唤醒。通过循环,即使发生了虚假唤醒,代码也会重新检查condition
。if (condition.get()) return;
: 在循环的开始,首先检查等待的条件是否已经满足。如果满足,则方法直接返回。long currentTimeMs = milliseconds();
: 获取当前时间。if (currentTimeMs >= deadlineMs) throw new TimeoutException(...)
: 检查是否已经超时。如果当前时间已经超过或等于最后期限,就抛出自定义的TimeoutException
。obj.wait(deadlineMs - currentTimeMs);
: 计算剩余的等待时间,并调用obj.wait()
。这会让当前线程释放锁并进入等待状态,直到被唤醒或等待超时。
这个实现确保了等待操作要么在条件满足时返回,要么在精确的截止时间后因超时而失败,非常稳健。
总结
SystemTime
是 Time
接口最基础、最直接的实现。它作为 Kafka 生产环境中的默认时间源,通过单例模式提供了对系统真实时钟的访问。它的代码简单明了,核心逻辑就是将 Time
接口的调用委托给 Java 的标准库方法 (System.currentTimeMillis
, System.nanoTime
, Thread.sleep
, Object.wait
)。
它与 MockTime
形成了鲜明的对比,MockTime
用于测试,时间是可控的、虚拟的;而 SystemTime
用于生产,时间是不可控的、真实的。正是这种通过接口进行隔离的设计,使得 Kafka 中大量与时间相关的复杂逻辑能够被轻松地、确定地进行测试。
Timer
首先,需要特别说明的是,在 Kafka 的代码库中存在两个都名为 Timer
的类/接口,它们的作用不同:
org.apache.kafka.common.utils.Timer
(当前分析的类): 这是一个位于clients
模块的工具类。它的主要作用是帮助管理和跟踪一个单一的、有超时的操作。它像一个“倒计时器”或“秒表”,用于简化那些需要在给定时间内完成的阻塞操作的逻辑。org.apache.kafka.server.util.timer.Timer
(接口): 这是一个位于server-common
模块的接口。它定义了一个任务调度器的行为,比如时间轮 (SystemTimer
)。它的职责是管理多个TimerTask
,并在它们各自的延迟时间到达后执行它们。
我们现在聚焦于前者:org.apache.kafka.common.utils.Timer
。
这个 Timer
类是一个辅助工具,旨在解决一个常见的编程问题:一个有总超时限制的高级操作,其内部可能包含了多个同样需要设置超时的小步骤。
核心思想: 创建一个代表总超时的 Timer
对象,然后将这个对象在各个子步骤中传递。在每个子步骤中,可以通过查询 timer.remainingMs()
来获取当前还剩余多少时间,从而为子步骤设置一个合理的、不会超出总时限的超时时间。
关键设计点 - 时间缓存:
这个 Timer
并非每次查询都去调用系统时间。它内部缓存了一个 currentTimeMs
。用户必须显式调用 update()
方法来用底层 Time
对象刷新这个缓存时间。这样做的好处是:
- 性能: 避免了在紧凑的循环中频繁调用
System.currentTimeMillis()
,减少了系统调用的开销。 - 控制权: 调用者可以精确控制何时更新计时器状态,这在
NetworkClient.poll
这样的场景中非常有用。
如 Javadoc 中的例子所示:
Time time = Time.SYSTEM;
Timer timer = time.timer(500); // 创建一个总超时为500ms的Timer
// 只要条件不满足且计时器没过期
while (!conditionSatisfied() && timer.notExpired()) {
// 使用剩余时间作为poll的超时时间
client.poll(timer.remainingMs(), timer.currentTimeMs());
// poll返回后,手动更新计时器的时间
timer.update();
}
成员变量
// ... existing code ...
public class Timer {
private final Time time;
private long startMs;
private long currentTimeMs;
private long deadlineMs;
private long timeoutMs;
// ... existing code ...
private final Time time
: 持有一个Time
接口的实例。这使得Timer
的行为可以被测试,通过注入MockTime
就可以控制时间的流逝。private long startMs
: 计时器开始或上次重置时的时间戳。private long currentTimeMs
: 缓存的当前时间。这个值只通过update()
或sleep()
方法更新。所有关于剩余时间、已过时间的计算都基于这个缓存值。private long deadlineMs
: 计时器到期的绝对时间戳。deadlineMs = startMs + timeoutMs
。private long timeoutMs
: 设置的总超时时长。
构造与初始化
// ... existing code ...
Timer(Time time, long timeoutMs) {
this.time = time;
update();
reset(timeoutMs);
}
// ... existing code ...
- 构造函数是包级私有的。外部用户应该通过
Time
接口的工厂方法time.timer(long timeoutMs)
来创建实例。 update()
: 首先调用update()
,将currentTimeMs
初始化为time.milliseconds()
的当前值。reset(timeoutMs)
: 然后用初始超时值设置startMs
和deadlineMs
。
时间更新与状态查询
update()
/update(long currentTimeMs)
:- 这是
Timer
手动控制的核心。update()
从底层的Time
对象获取最新时间来更新currentTimeMs
。 this.currentTimeMs = Math.max(currentTimeMs, this.currentTimeMs);
这一行保证了时间的单调性。即使系统时钟发生回拨,Timer
内部的缓存时间也绝不会倒退,这增强了计时的稳定性。
- 这是
isExpired()
/notExpired()
:- 基于缓存的
currentTimeMs
和deadlineMs
进行比较,判断是否超时。
- 基于缓存的
remainingMs()
:- 返回
deadlineMs - currentTimeMs
,即剩余时间。这是将总超时分解给子步骤的关键方法。结果最小为 0。
- 返回
elapsedMs()
:- 返回
currentTimeMs - startMs
,即从开始或上次重置后经过的时间。
- 返回
重置 (Reset)
reset(long timeoutMs)
:- 使用当前缓存的
currentTimeMs
作为新的startMs
,并根据新的timeoutMs
计算新的deadlineMs
。 - 注意:这个方法不会自动更新
currentTimeMs
。所以,如果想基于最新的时间来重置,正确的模式是先update()
再reset()
,或者直接使用下面的updateAndReset
。
- 使用当前缓存的
updateAndReset(long timeoutMs)
:- 一个便利方法,它组合了
update()
和reset(timeoutMs)
,是重置计时器的常用方式。
- 一个便利方法,它组合了
sleep(long durationMs)
// ... existing code ...
public void sleep(long durationMs) {
long sleepDurationMs = Math.min(durationMs, remainingMs());
time.sleep(sleepDurationMs);
update();
}
}
- 这个方法非常巧妙。它不会无脑地
sleep
指定的durationMs
。 - 它会先计算出
durationMs
和remainingMs()
中的较小值。这意味着sleep
的时间绝不会超过计时器的剩余时间。 sleep
结束后,它会自动调用update()
来刷新缓存时间。
总结
org.apache.kafka.common.utils.Timer
是一个精巧的、用于管理单次操作超时的客户端工具类。它与 server-common
中的 Timer
接口(任务调度器)有着本质的区别。
它的核心设计哲学是:
- 依赖注入
Time
: 实现了与具体时钟的解耦,使其具有优秀的可测试性。 - 手动时间缓存: 通过要求调用者显式调用
update()
,在提供了性能优化的同时,也赋予了调用者对计时过程的精细控制。 - 保证时间单调性: 即使系统时钟回拨,内部计时也不会出错,保证了健壮性。
- 简化复合超时逻辑: 使得将一个大的超时分解到多个小步骤中的逻辑变得简单和安全。
在 Kafka 客户端代码中,当需要执行网络请求、等待响应等有明确超时限制的操作时,这个 Timer
类被广泛使用,是保证客户端健壮性的一个重要工具。