title: 延迟队列(DelayQueue)
id: 32485db0-3ae1-45fc-9574-5c0331e1bfc9
date: 2023-03-08 11:39:14
auther: songguangzhi
cover:
excerpt: 一、什么是延迟队列(DelayQueue)? DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。 /** * An unbounded {@linkplain BlockingQueue blocki
permalink: /archives/delay-queue
categories:
- jdk
- mq
- interview
tags: - 后端
一、什么是延迟队列(DelayQueue)?
DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。
/**
* An unbounded {@linkplain BlockingQueue blocking queue} of
* {@code Delayed} elements, in which an element can only be taken
* when its delay has expired. The <em>head</em> of the queue is that
* {@code Delayed} element whose delay expired furthest in the
* past. If no delay has expired there is no head and {@code poll}
* will return {@code null}. Expiration occurs when an element's
* {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
* than or equal to zero. Even though unexpired elements cannot be
* removed using {@code take} or {@code poll}, they are otherwise
* treated as normal elements. For example, the {@code size} method
* returns the count of both expired and unexpired elements.
* This queue does not permit null elements.
* /
由定义可知,DelayQueue 是一个无界阻塞队列,队列中的元素只有在延迟期满后才能被取出。队列的头部存储的是最先到期的元素。添加进该队列的元素必须实现 Delayed 接口,指定延迟时间,元素过期的判断是根据 getDelay(TimeUnit unit) 方法的返回值,返回值小于等于 0,则认为元素过期。队列不允许存储空元素。
二、DelayQueue 的使用场景
DelayQueue 被用于需要延迟处理任务的场景,例如,网民在网上商城下单后,如果超时未支付,订单会被后台系统关闭。这种需要延时处理的场景就可以采用 DelayQueue 实现。
三、原理解析(源码)
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// ...
}
DelayQueue 类继承了AbstractQueue,并实现了 BlockingQueue 接口,DelayQueue 的泛型参数(即队列中的元素)要实现 Delayed 接口。Delayed 接口定义如下。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed 接口继承了 Comparable 接口,Comparable 接口定义如下。
public interface Comparable<T> {
public int compareTo(T o);
}
所以,延迟队列中的元素要实现 getDelay(TimeUnit unit) 和 compareTo(T o) 两个方法。
- compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚
- getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
延迟队列的属性
DelayQueue 中的重要属性如下所示。
// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();
DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。
DelayQueue 的主要方法
offer 添加元素
public boolean offer(E e) {
// 获取全局独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 向优先队列中插入元素
q.offer(e);
// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
// 释放全局独占锁
lock.unlock();
}
}
-
leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
-
如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
-
如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。
DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。
take 取出元素
take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
public E take() throws InterruptedException {
// 获取全局独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队头元素,peek 方法不会删除元素
E first = q.peek();
if (first == null)
// 若队头为空,则阻塞当前线程
available.await();
else {
// 否则获取队头元素的超时时间
long delay = first.getDelay(NANOSECONDS);
// 已超时,直接出队
if (delay <= 0)
return q.poll();
// 释放 first 的引用,避免内存泄漏
first = null; // don't retain ref while waiting
// leader != null 表明有其他线程在操作,阻塞当前线程
if (leader != null)
available.await();
else {
// leader 指向当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超时阻塞
available.awaitNanos(delay);
} finally {
// 释放 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列
if (leader == null && q.peek() != null)
available.signal();
// 释放全局独占锁
lock.unlock();
}
}
poll 取出元素
取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
图解 DelayQueue 的生产/消费过程
DelayQueue 是 Leader-Follower 模式的变种,以下通过队列及消费者线程状态变化大致说明一下 DelayQueue 的运行过程。
- 因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单,所以此图不考虑向队列添加元素的生产者线程。假设现在共有三个消费者线程。
- 队列中的元素按到期时间排序,队列头部的元素 2s 以后到期。消费者线程1查看了头部元素以后,发现还需要 2s 才到期,于是它进入等待状态,2s 以后醒来,等待头部元素到期的线程称为 Leader 线程。
- 消费者线程 2 与消费者线程 3 处于待命状态,它们不等待队列中的非头部元素。当消费者线程1拿到对象 5 以后,会向它们发送 signal。这个时候两个线程中的一个会结束待命状态而进入等待状态。
评论区