【Kafka源码解读和使用指南】第48篇:Kafka时间轮(TimingWheel)源码解析——定时器的艺术
上一篇【第47篇】Kafka延迟操作DelayedOperation源码解析——优雅处理等待响应下一篇【第49篇】Kafka副本机制源码解析一——副本的世界观摘要上篇我们讲了DelayedOperation但没有深究它背后的定时器是怎么工作的。Kafka没有用JDK自带的ScheduledThreadPool或DelayQueue而是自己实现了一套层级时间轮Hierarchical TimingWheel——因为Broker上可能同时挂载数万个延迟任务传统的堆/队列定时器在这种量级下会严重拖垮性能。本文从为什么不用JDK定时器讲起手绘时间轮的完整数据结构逐行拆解add()、advanceClock()、reinsert()三大核心操作的源码最后解析SystemTimer如何把时间轮封装成一个优雅的定时器服务。读完你会发现原来定时器也可以这么写一、为什么不用 JDK 自带的定时器Kafka Broker 需要同时管理海量定时任务。一个中型集群的Broker上可能有上万个未完成的生产请求每个都是一个DelayedProduce和上万个等待数据的拉取请求DelayedFetch每个都需要超时管理。我们先看看常见定时器的性能特点【定时器实现方案对比】 方案A: java.util.Timer最小堆 ┌──────────────────────┐ │ 插入: O(log N) │ ← 每次插入都要调整堆 │ 删除: O(log N) │ ← 取消了要删除 │ 推进: O(1) │ ← 直接拿堆顶 │ │ │ 问题: 单线程线程 │ │ 安全差不适合 │ │ 高并发场景 │ └──────────────────────┘ 方案B: DelayQueuePriorityBlockingQueue ┌──────────────────────┐ │ 插入: O(log N) │ ← 加锁 堆调整 │ 删除: O(log N) │ ← 加锁 堆调整 │ 推进: O(1) │ ← poll() 取堆顶 │ │ │ 问题: 上万任务时 │ │ O(log N) 也扛 │ │ 不住 │ └──────────────────────┘ 方案C: Kafka TimingWheel时间轮 ┌──────────────────────┐ │ 插入: O(1) ★ │ ← 直接定位槽位 │ 删除: O(1) ★ │ ← 从链表移除 │ 推进: O(1) ★ │ ← 移动指针即可 │ │ │ 代价: 空间换时间 │ │ 需要预分配槽位 │ └──────────────────────┘对比维度Timer/DelayQueue堆Kafka TimingWheel插入复杂度O(log N)O(1)删除复杂度O(log N)O(1)推进复杂度O(1)取堆顶O(1)移动指针10万任务时的插入约 log₂(100000) ≈ 17次比较1次哈希定位内存开销较小较大预分配槽数组适用场景任务数少1000海量任务万级以上一句话总结JDK定时器的O(log N)在小规模下完全够用但在Kafka这种数万任务同频次增删的场景下日志次方的开销累积起来就是性能瓶颈。时间轮用空间换时间把增删都做到了O(1)。二、单层时间轮用时钟理解核心思想时间轮的思想其实非常简单——想象一个钟表【单层时间轮结构8个槽位每个槽代表1ms】 时间轮总跨度 8mstickMs1ms, wheelSize8 ┌─────────────────────────────────┐ │ TimingWheel │ │ │ │ 槽位0 槽位1 槽位2 │ │ ┌────┐ ┌────┐ ┌────┐ │ │ │ │ │ T2 │ │ │ │ │ └────┘ └────┘ └────┘ │ │ │ │ 槽位7 槽位6 ... 槽位3 │ │ ┌────┐ ┌────┐ ┌────┐ │ │ │ │ │ T1 │ │ │ │ │ └────┘ └────┘ └────┘ │ │ ↑ │ │ 当前时间6ms │ │ 指针指向槽位6 │ └─────────────────────────────────┘ 任务T1延迟2ms放入 (62) % 8 槽位0 任务T2延迟4ms放入 (64) % 8 槽位2 当指针走到槽位0时T1到期 → 执行 当指针走到槽位2时T2到期 → 执行核心公式槽位编号 (当前时间 延迟毫秒) / tickMs % wheelSize (currentTime delayMs) / tickMs % wheelSize但单层时间轮有个致命问题——时间跨度有限。如果wheelSize20, tickMs1ms最多只能覆盖20ms的范围。那我要一个30ms后执行的任务怎么办答案层级时间轮。三、层级时间轮像水表一样层层递进Kafka的做法是把时间轮分层就像水表上的千位轮走一圈百位轮才走一格【层级时间轮结构——类比水表】 ┌──────────────────────┐ │ 第3层百位轮 │ tickMs100ms, wheelSize20 │ 覆盖: 100~2000ms │ 走一格 100ms └──────────┬───────────┘ │ 走一圈(2000ms) → 向第4层进位 ┌──────────▼───────────┐ │ 第2层十位轮 │ tickMs10ms, wheelSize20 │ 覆盖: 10~200ms │ 走一格 10ms └──────────┬───────────┘ │ 走一圈(200ms) → 向第3层进位 ┌──────────▼───────────┐ │ 第1层个位轮 │ tickMs1ms, wheelSize20 │ 覆盖: 1~20ms │ 走一格 1ms └──────────────────────┘ ↓ 这个轮叫 overflowWheel溢出轮 层级递进规则 - 任务延迟 ≤ 当前层最大跨度 → 放入当前层 - 任务延迟 当前层最大跨度 → 放入上层overflowWheel - 上层指针走一格 → 该格的定时任务降级到下一层 示例 30ms延迟的任务 → 第1层最多20ms放不下 → 第2层最多200ms放得下 → 放入第2层 → 第2层指针走到 (30/10)%20 槽位3 → 指针走到槽位3时任务降级到第1层四、TimingWheel 源码解析 —— add/advanceClock/reinsert现在看源码。Kafka的时间轮实现在TimingWheel.scala/** * Kafka 层级时间轮实现 * 位置core/src/main/scala/kafka/utils/timer/TimingWheel.scala */nonthreadsafeprivate[timer]classTimingWheel(tickMs:Long,// 每个槽位代表的毫秒数基本时间粒度wheelSize:Int,// 槽位数量startMs:Long,// 时间轮的起始时间戳taskCounter:AtomicInteger,// 全局任务计数器queue:DelayQueue[TimerTaskList]// 用于推进时间的延迟队列){// 当前时间轮的总跨度 tickMs * wheelSizeprivate[this]val intervaltickMs*wheelSize// 槽位数组每个槽位是一个 TimerTaskList双向链表private[this]val bucketsArray.tabulate[TimerTaskList](wheelSize){_newTimerTaskList(taskCounter)}// 当前时间已推进到的时间点private[this]varcurrentTimestartMs-(startMs%tickMs)// 上层时间轮当任务延迟超过本层跨度时使用volatileprivate[this]varoverflowWheel:TimingWheelnull// 核心方法 1: add() /** * 添加一个定时任务到时间轮 * * return false 表示任务已过期应立即执行 * true 表示成功添加到时间轮 */defadd(timerTaskEntry:TimerTaskEntry):Boolean{val expirationtimerTaskEntry.expirationMs// 任务的过期时间戳if(timerTaskEntry.cancelled()){// 任务已被取消直接忽略false}elseif(expirationcurrentTimetickMs){// ★ 任务已经过期或即将在下一个tick过期// 返回 false 让调用者立即执行false}elseif(expirationcurrentTimeinterval){// ★ 任务在本层时间轮的覆盖范围内// 计算应该放入哪个槽位val virtualId(expiration/tickMs).toInt val bucketbuckets(virtualId%wheelSize.toLong)bucket.add(timerTaskEntry)// 如果这个槽位刚被激活之前是空的设置过期时间if(bucket.setExpiration(virtualId*tickMs)){// 放入 DelayQueue 来驱动时间推进queue.offer(bucket)}true}else{// ★ 任务延迟超过本层范围 → 交给上层时间轮if(overflowWheelnull){// 懒创建上层时间轮addOverflowWheel()}overflowWheel.add(timerTaskEntry)}}// 核心方法 2: advanceClock() /** * 推进时间轮的当前时间。 * 这个方法由 SystemTimer 的驱动线程调用。 * * param timeMs 要推进到的时间点 */defadvanceClock(timeMs:Long):Unit{if(timeMscurrentTimetickMs){// 将当前时间向下对齐到 tickMs 的倍数currentTimetimeMs-(timeMs%tickMs)// ★ 同时推进上层时间轮递归if(overflowWheel!null){overflowWheel.advanceClock(currentTime)}}}// 辅助方法: addOverflowWheel() private[this]defaddOverflowWheel():Unit{// 上层时间轮的 tickMs 当前层的 tickMs * wheelSize// 即上层时间轮每个槽代表的时间粒度更粗overflowWheelnewTimingWheel(tickMsinterval,// ★ 关键上层tick 本层intervalwheelSizewheelSize,startMscurrentTime,taskCountertaskCounter,queuequeue// ★ 共享同一个 DelayQueue)}}add() 方法的决策树【add(task) 决策流程】 add(task) 被调用 │ ├─ 任务已取消 │ └─ YES → 返回 false忽略 │ ├─ expiration currentTime tickMs │ └─ YES → 返回 false已过期立即执行 │ ├─ expiration currentTime interval │ └─ YES → 放入本层槽位 buckets[idx] │ → 如果槽位刚激活加入 queue推进用 │ → 返回 true │ └─ NO → 延迟超出本层范围 → 懒创建 overflowWheel → overflowWheel.add(task) ← 递归五、TimerTaskList —— 槽位的双向链表实现每个槽位不是简单存一个任务而是一个带过期时间的双向链表/** * 时间轮槽位 —— 存储同一时间点到期的一组任务 * 位置core/src/main/scala/kafka/utils/timer/TimerTaskList.scala */classTimerTaskList(taskCounter:AtomicInteger)extendsDelayed{// 哨兵节点简化边界处理privateval rootnewTimerTaskEntry(null,-1)root.nextroot root.prevroot// 这个槽位的过期时间privateval expirationnewAtomicLong(-1L)// 设置过期时间只有在槽位为空时才能设置defsetExpiration(expirationMs:Long):Boolean{expiration.getAndSet(expirationMs)!expirationMs}// 添加任务到链表尾部defadd(timerTaskEntry:TimerTaskEntry):Unit{vardonefalsewhile(!done){// 先移除任务原有的链表关系可能之前在别的槽位timerTaskEntry.remove()synchronized{timerTaskEntry.listthis// 标记属于这个列表val tailroot.prev timerTaskEntry.nextroot timerTaskEntry.prevtail tail.nexttimerTaskEntry root.prevtimerTaskEntry taskCounter.incrementAndGet()donetrue}}}// 执行该槽位的所有到期任务defflush(f:TimerTaskEntryUnit):Unit{synchronized{varheadroot.nextwhile(head ne root){remove(head)// 从链表移除if(!head.cancelled){f(head)// 执行回调最终会调用 task.run()}headroot.next}expiration.set(-1L)// 重置过期时间}}// DelayQueue 接口获取过期时间override defgetDelay(unit:TimeUnit):Long{unit.convert(expiration.get()-System.currentTimeMillis(),MILLISECONDS)}}关键设计点TimerTaskList 实现了Delayed接口—— 这样它就能放入DelayQueue。系统不需要轮询每个槽位而是用DelayQueue.take()阻塞等待最近一个到期的槽位。哨兵节点root—— 避免空链表时的空指针判断。flush()方法—— 当时间指针走到这个槽位时一次性执行该槽位的所有任务。六、SystemTimer —— 把时间轮封装成完整的定时器服务SystemTimer是时间轮的操盘手它负责驱动时间推进/** * 系统定时器 —— 时间轮的上层封装 * 位置core/src/main/scala/kafka/utils/timer/SystemTimer.scala */classSystemTimer(executorName:String,tickMs:Long1,// 每个槽 1mswheelSize:Int20,// 每层 20 个槽startMs:LongSystem.currentTimeMillis())extendsTimerwithLogging{// 底层时间轮初始只有一层private[this]val timingWheelnewTimingWheel(tickMstickMs,wheelSizewheelSize,startMsstartMs,taskCountertaskCounter,queuedelayQueue)// DelayQueue用于阻塞等待下一个到期的槽位private[this]val delayQueuenewDelayQueue[TimerTaskList]()// 守护线程负责推进时间private[this]val readWriteLocknewReentrantReadWriteLock()private[this]val writeLockreadWriteLock.writeLock()// 核心驱动线程 /** * 添加任务时如果这会改变最近到期时间可能需要唤醒驱动线程。 * 但 Kafka 的实现更巧妙 * * 驱动线程用 delayQueue.take() 阻塞等待最近到期的槽位 * → 当有新的更早到期的任务加入时 → 新槽位被 offer 进 queue * → 不需要显式唤醒take() 本身就是阻塞等待的 */// 添加任务 defadd(timerTask:TimerTask):Unit{readLock.lock()try{// 创建 TimerTaskEntry 包装任务val taskEntrynewTimerTaskEntry(timerTask,timerTask.delayMsSystem.currentTimeMillis())// 委托给时间轮处理 add →// → 过期了返回 false直接运行// → 没过期放入对应层级、对应槽位if(!timingWheel.add(taskEntry)){// 已过期的任务直接在线程池中执行if(!taskEntry.cancelled){taskExecutor.submit(timerTask)}}}finally{readLock.unlock()}}// 推进时间由 AdvanceThread 调用 defadvanceClock(timeoutMs:Long):Boolean{// 从 DelayQueue 获取最近到期的槽位阻塞等待最多 timeoutMs 毫秒varbucketdelayQueue.poll(timeoutMs,TimeUnit.MILLISECONDS)if(bucket!null){writeLock.lock()try{while(bucket!null){// 推进时间轮到该槽位的过期时间timingWheel.advanceClock(bucket.getExpiration())// 执行该槽位的所有到期任务// flush 内部会遍历链表对每个任务调用 task.run()bucket.flush(reinsert)// 尝试获取下一个到期的槽位不阻塞bucketdelayQueue.poll()}}finally{writeLock.unlock()}true}else{false// 没有到期的任务}}// 任务降级reinsert /** * 当一个任务在上层时间轮到期时它被降级到下层。 * 但直接用 add() 就行 —— add() 会自动判断该放哪一层。 */private[this]val reinsert:TimerTaskEntryUnit{timerTaskEntryaddTimerTaskEntry(timerTaskEntry)}}七、完整运行流程一个30ms延迟任务的生命周期通过一个完整例子串联所有组件【30ms延迟任务的完整生命周期】 假设tickMs1ms, wheelSize20, startMs0 第1层跨度 20ms, 第2层跨度 400ms 时间线: ───────────────────────────────────────────────────────────► T0ms: add(task, delay30ms) │ │ SystemTimer.add() │ └─ timingWheel.add(taskEntry) │ │ │ ├─ expiration30 currentTimetickMs(1)? NO │ ├─ expiration30 currentTimeinterval(20)? NO │ └─ 创建 overflowWheel (tickMs20ms, interval400ms) │ └─ overflowWheel.add(taskEntry) │ │ 当前第2层 currentTime0 │ ├─ expiration30 020? NO │ ├─ expiration30 0400? YES → 放入第2层 │ │ slot (30/20) % 20 槽位1 │ │ bucket[1].setExpiration(20ms) │ │ delayQueue.offer(bucket[1]) │ └─ true ✓ │ ▼ T20ms: delayQueue.poll() 返回 bucket[1] │ │ SystemTimer.advanceClock() │ │ │ ├─ timingWheel.advanceClock(20) │ │ currentTime 20ms │ │ overflowWheel.advanceClock(20) → 递归推进上层 │ │ │ └─ bucket[1].flush(reinsert) │ │ │ ├─ 遍历链表中的 taskEntry │ ├─ 对每个 taskEntry 调用 reinsert │ │ └─ timingWheel.add(taskEntry) ← 再次添加 │ │ │ expiration30 201? NO │ │ │ expiration30 2020? YES → 放入第1层 │ │ │ slot (30/1) % 20 槽位10 │ │ │ bucket[10].setExpiration(30ms) │ │ │ delayQueue.offer(bucket[10]) │ │ └─ true ✓ │ │ │ └─ 清空 bucket[1] │ ▼ T30ms: delayQueue.poll() 返回 bucket[10] │ │ SystemTimer.advanceClock() │ │ │ ├─ timingWheel.advanceClock(30) │ │ currentTime 30ms │ │ │ └─ bucket[10].flush(reinsert) │ │ │ ├─ 遍历链表中的 taskEntry │ ├─ 对每个 taskEntry 调用 reinsert │ │ └─ timingWheel.add(taskEntry) │ │ expiration30 301? YES → 返回 false │ │ ──► SystemTimer.add() 中: │ │ taskExecutor.submit(timerTask) ★ │ │ └─ timerTask.run() → forceComplete() │ │ │ └─ 清空 bucket[10] │ ▼ TASK EXECUTED ✓八、时间轮 vs 其他定时器方案的终极对比维度Timer (堆)DelayQueue (堆)ScheduledThreadPoolKafka TimingWheel插入O(log N)O(log N)O(log N)O(1)删除O(log N)O(log N)O(log N)O(1)推进O(1)O(1)O(1)O(1)线程模型单线程无自带线程线程池单推进线程 任务线程池海量任务❌ 性能退化严重❌ 同左❌ 同左✅ 专为此设计精度毫秒级毫秒级毫秒级毫秒级由tickMs决定内存开销小小小较大预分配槽数组适用场景1000任务5000任务10000任务万级以上任务Kafka时间轮的必杀技O(1) 时间复杂度的增删—— 不需要调整堆直接哈希定位槽位DelayQueue 驱动推进—— 不需要轮询阻塞等待最近到期的槽位零 CPU 浪费层级设计—— 长延迟任务自动上浮到粗粒度层到期时再沉降回来惰性创建—— overflowWheel 只在需要时才创建节省内存本篇小结Kafka的时间轮是一个教科书级别的空间换时间设计单层时间轮用哈希定位实现O(1)的增删但时间跨度受限层级时间轮通过上层粗粒度、下层细粒度的分层设计既保持O(1)性能又能覆盖任意长度的时间范围SystemTimer用DelayQueue 推进线程优雅地驱动整个时间轮无需轮询reinsert机制实现了任务在不同层级之间的升降级延迟长时放在粗粒度层快到期时降到细粒度层这套设计从Kafka 0.8版本就开始使用经受住了全球数万集群的考验。理解了它你再看Netty的HashedWheelTimer、Dubbo的时间轮会发现思路其实是相通的。上一篇【第47篇】Kafka延迟操作DelayedOperation源码解析——优雅处理等待响应下一篇【第49篇】Kafka副本机制源码解析一——副本的世界观