介绍
时间轮算法的应用非常广泛,在 Dubbo、Netty、Kafka、ZooKeeper、Quartz 的组件中都有时间轮思想的应用,甚至在 Linux 内核中都有用到。
使用场景:
如果一个系统中存在着大量的调度任务,而大量的调度任务如果每一个都使用自己的调度器来管理任务的生命周期的话,浪费cpu的资源并且很低效。
时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable)。能够高效的管理各种延时任务,周期任务,通知任务等等。
不过,时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合。因为时间轮算法的精度取决于,时间段“指针”单元的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。
常用网络通信的心跳检测,需要大量的定时任务,但是执行任务较少的场景。
或者是关闭订单这种,任务不需要太多资源但是又要保证即时释放的
所以时间轮一般满足的特点是短,平,快的一些业务场景
时间轮结构
如图,JRaft中时间轮(HashedWheelTimer)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(HashedWheelBucket),HashedWheelBucket是一个环形的双向链表,链表中的每一项表示的都是定时任务项(HashedWheelTimeout),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度。时间轮的时间格个数是固定的,可用 wheel.length 来表示。
时间轮还有一个表盘指针(tick),用来表示时间轮当前指针跳动的次数,可以用tickDuration * (tick + 1)来表示下一次到期的任务,需要处理此时间格所对应的 HashedWheelBucket 中的所有任务。
时间轮运行逻辑
时间轮的基本组成
- wheel:时间轮子
- wheelSize:时间轮的大小
- tickDuration:时间跨度,可以理解一格所代表的时间
- timeout:时间到所需要执行的东西,封装了定时任务
- currentIndex: 时间轮还有一个表盘指针,用来表示时间轮当前所处的时间
运行时间轮时,时间轮可以分为多个槽,每个槽上面存在一个定时任务的链表。会去遍历当前的链表,如果当前的剩余圈数等于0了,则会执行相应的定时任务,反正则会把remainingRounds - 1,等到下一次转动到这里的时候进行判断,实现定时功能。
当时间轮添加任务的时候,只需要将要执行时间的timeOut算出,需要在时间轮当中执行几圈然后对应的槽位置,然后添加到相应的链表上,让工作线程不断轮询wheel,实现定时调度的结果。
时间轮由固定个数(wheelSize)的时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs),整个时间轮的总体时间跨度(interval)为 tickMs*wheelSize。
时间轮还有一个表盘指针(currentIndex),用来表示时间轮当前所处的时间,currentIndex是 tickMs 的整数倍。 currentIndex可以将整个时间轮划分为到期部分和未到期部分,currentIndex当前指向的时间格刚好到期,需要处理此时间格对应的 TimerTaskList 中的所有任务。
java简单实现
首先定义时间轮里面的“槽”,存储采用链表,方便添加和断开
@Getter
@Setter
public class Slot {
private final int id;
/**
* 执行任务
*/
private LinkedList<TimeOut> workList;
public Slot(int id) {
this.id = id;
}
/**
* 添加定时任务到链表中
* @param work 定时任务
*/
public void add(TimeOut work) {
// 获取对应位置
if (CollectionUtil.isEmpty(this.workList)){
LinkedList<TimeOut> timeOutList = new LinkedList<>();
timeOutList.add(work);
this.workList = timeOutList;
}else {
// 获取最后一个timeOut指向下一个timeOut
TimeOut last = this.workList.getLast();
last.setNextTimeOut(work);
this.workList.add(work);
}
}
/**
* 移除定时任务
* @param work 定时任务
* @return 成功还是失败
*/
public Boolean remove(TimeOut work) {
return workList.remove(work);
}
/**
* 获取当前槽的元素
* @return 返回存储鼎盛时任务的链表
*/
public LinkedList<TimeOut> elements() {
return workList;
}
}
定义一个定时任务,用Runnable作为任务,方便调度
/**
* 定时器载体
* @author sgz
*/
@Getter
@Setter
public class TimeOut {
/**
* 剩余圈数
*/
private int remainingRounds;
/**
* 执行的任务
*/
private Runnable runnable;
/**
* 下一个节点
*/
private TimeOut nextTimeOut;
}
再定义一个线程池,防止单个线程执行时间过长导致轮询下一个任务超时,调度完成后异步的去执行
public class ThreadUtils {
/**
* 工作队列,用来存储等待执行的任务
* ArrayBlockingQueue:基于数组的有界阻塞队列,此队列按FIFO原则对元素进行排序
*/
private static final ArrayBlockingQueue<Runnable> ARRAY_BLOCKING_QUEUE = new ArrayBlockingQueue<>(5000);
/**
* 核心线程数
* 当已创建的线程数大于corePoolSize后,任务将被放入任务队列中
*/
private static final int CORE_POOL_SIZE = 4;
/**
* 最大线程数,线程池允许创建的最大线程数
* 当任务队列已经放满了,且已创建线程数小于maximumPoolSize时,则线程池会创建新的线程执行任务
*/
private static final int MAXIMUM_POOL_SIZE = 8;
/**
* 空闲线程存活时间
* 只有当已创建的线程数大于corePoolSize时,此参数才会起作用
*/
private static final long KEEP_ALIVE_TIME = 3L;
/**
* 参数keepAliveTime的时间单位
*/
private static final TimeUnit UNIT = TimeUnit.SECONDS;
/**
* 线程池执行器
*/
private static ThreadPoolExecutor executor;
/**
* 拒绝策略 CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
*/
private static final RejectedExecutionHandler REJECTED_HANDLER = new ThreadPoolExecutor.AbortPolicy();
// 初始化线程池执行器
static {
if (null == executor) {
ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().build();
executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
UNIT,
ARRAY_BLOCKING_QUEUE,
THREAD_FACTORY,
REJECTED_HANDLER);
}
}
/**
* 提交任务
* @param command 线程
*/
public static void execute(Runnable command) {
executor.execute(command);
}
/**
* 提交任务
* @param command 线程
*/
public static Future<?> submit(Runnable command) {
return executor.submit(command);
}
}
时间轮如下,TimeOut
保存了下一节点的信息,能稍微提升性能。没有则不进行遍历
@Slf4j
public class TimingWheel {
/**
* 一轮持续时间
*/
private final long tickDuration;
/**
* 一共分成几槽
*/
private final int ticksPerWheel;
/**
* 每槽所执行时间
*/
private final long tickTime;
/**
* 当前刻度轮数
*/
private volatile int currentTickIndex = 0;
/**
* 轮子
*/
private final ArrayList<Slot> wheel;
/**
* 开关
*/
private final AtomicBoolean shutdown = new AtomicBoolean(false);
/**
* 锁
*/
private final ReadWriteLock lock = new ReentrantReadWriteLock();
/**
* 工
*/
private final Thread workerThread;
// ~ -------------------------------------------------------------------------------------------------------------
/**
* 构造时间轮
* @param tickDuration 一轮持续时间
* @param ticksPerWheel 一轮分成几份
* @param timeUnit 一轮持续时间的单位
*/
public TimingWheel(int tickDuration, int ticksPerWheel, TimeUnit timeUnit) {
if (timeUnit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
this.wheel = new ArrayList<>();
this.tickDuration = TimeUnit.MILLISECONDS.convert(tickDuration, timeUnit);
this.ticksPerWheel = ticksPerWheel + 1;
this.tickTime = this.tickDuration / this.ticksPerWheel;
for (int i = 0; i < this.ticksPerWheel; i++) {
wheel.add(new Slot(i));
}
// 修剪大小
wheel.trimToSize();
workerThread = new Thread(new TickWorker(), "Timing-Wheel");
long convert = TimeUnit.SECONDS.convert(this.tickDuration, TimeUnit.MILLISECONDS);
log.info("时间轮初始化完成--------(一轮所需时间为:{}秒, 每槽时间:{}ms, 一共分成{}槽)", convert, tickTime, this.ticksPerWheel);
}
// ~ -------------------------------------------------------------------------------------------------------------
public void start() {
if (shutdown.get()) {
throw new IllegalStateException("Cannot be started once stopped");
}
if (!workerThread.isAlive()) {
System.out.println("工作线程启动");
workerThread.start();
}
}
public boolean stop() {
if (!shutdown.compareAndSet(false, true)) {
return false;
}
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return true;
}
/**
* 获取上一个刻度轮数
* @return 上一个刻度
*/
private int getPreviousTickIndex() {
lock.readLock().lock();
try {
int cti = currentTickIndex;
if (cti == 0) {
return ticksPerWheel - 1;
}
return cti - 1;
} finally {
lock.readLock().unlock();
}
}
/**
* 增加定时器任务
* @param runnable 任务
* @param time 启动偏移时间
*/
public void addTask(Runnable runnable, long time){
// 需要几圈
long round = time / tickDuration;
// 余数
long remainder = time % tickDuration;
// 第几槽
int sortIndex = (int) (remainder / tickTime);
Slot slot = wheel.get(sortIndex);
// 定时器对象
TimeOut timeOut = new TimeOut();
timeOut.setRunnable(runnable);
timeOut.setRemainingRounds((int) round);
// 将定时器对象添加到槽当中
slot.add(timeOut);
}
// ~ -------------------------------------------------------------------------------------------------------------
/**
* 工作线程,不断轮询时间轮
*/
private class TickWorker implements Runnable {
private long startTime;
/**
* 计数,槽数
*/
private long tick = 1;
@Override
public void run() {
startTime = System.currentTimeMillis();
// 循环获取时间
for (int i = 0; !shutdown.get(); i++) {
if (i == wheel.size()) {
i = 0;
}
lock.writeLock().lock();
try {
currentTickIndex = i;
} finally {
lock.writeLock().unlock();
}
System.out.println("当前正在检查第" + currentTickIndex + "槽的定时任务");
this.notifyExpired(currentTickIndex);
// 等待校验时间
waitForNextTick();
}
}
/**
* 唤醒当前时间的任务
* @param currentTickIndex 当前刻度
*/
private void notifyExpired(int currentTickIndex) {
// 获取当前第一槽
Slot eSlot = wheel.get(currentTickIndex);
LinkedList<TimeOut> elements = eSlot.elements();
if (CollectionUtil.isEmpty(elements)){
return;
}
TimeOut timeOut = elements.get(0);
// 执行定时任务
executionTimeOut(timeOut, eSlot);
}
/**
* 唤醒定时器看是否需要执行
* @param timeOut 定时器
*/
private void executionTimeOut(TimeOut timeOut,Slot eSlot) {
int round = timeOut.getRemainingRounds();
// 圈数为0则执行任务
if (round <= 0){
System.out.println("正在执行定时任务, 当前槽数" + currentTickIndex);
// 先移除后执行任务
eSlot.remove(timeOut);
ThreadUtils.execute(timeOut.getRunnable());
}else {
timeOut.setRemainingRounds(round - 1);
}
if (timeOut.getNextTimeOut() != null){
executionTimeOut(timeOut.getNextTimeOut(), eSlot);
}
}
/**
* 等待时间,可以理解为校准时间
*/
private void waitForNextTick() {
for (;;) {
long currentTime = System.currentTimeMillis();
long sleepTime = tickTime * tick - (currentTime - startTime);
if (sleepTime <= 0) {
break;
}
// 时间轮所算时间
DateTime date = DateUtil.date(tickTime * tick + startTime);
try {
System.out.println("睡眠时间为" + sleepTime + "毫秒--------------当前应为时间" + date);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
return;
}
}
tick++;
}
}
}
定义了一个工作线程TickWorker,当调用start则开始工作,不断轮询。基本使用如下
public class TimingWheelTest {
public static void main(String[] args) {
TimingWheel timingWheel = new TimingWheel(10, 9, TimeUnit.SECONDS);
timingWheel.addTask(() -> System.err.println("执行定时器,当前时间" + DateUtil.now()), 4000);
timingWheel.start();
}
}
总结
此时间轮参考netty的HashedWheelTimer
来实现的有兴趣可以去查看,这是简单版本。
实现都围绕三个要素:任务、任务的组织者(队列),执行者调度执行者。
时间轮的实现类似钟表的运作方式,它的任务插入和删除时间复杂度都为O(1),相对而言时间轮更适合任务数很大的延时场景。
对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数(Netty),二是多层次时间轮(Kakfa)。
相比而言Netty的实现会有空推进的问题,而Kafka采用DelayQueue来保存有任务数据的槽,利用空间换时间的思想解决了空推进的问题。
评论区