侧边栏壁纸
博主头像
憨憨大头个人博客博主等级

心存希冀,目有繁星

  • 累计撰写 110 篇文章
  • 累计创建 13 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

延迟队列(DelayQueue)

Administrator
2024-08-03 / 0 评论 / 0 点赞 / 10 阅读 / 13911 字

title: 延迟队列(DelayQueue)
id: 32485db0-3ae1-45fc-9574-5c0331e1bfc9
date: 2023-03-08 11:39:14
auther: songguangzhi
cover:
excerpt: 一、什么是延迟队列(DelayQueue)? DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。 /** * An unbounded {@linkplain BlockingQueue blocki
permalink: /archives/delay-queue
categories:

  • jdk
  • mq
  • interview
    tags:
  • 后端

一、什么是延迟队列(DelayQueue)?

DelayQueue 是 Java 并发包 java.util.concurrent 下的一个 Class,其官方定义如下所示。

		/**
         * An unbounded {@linkplain BlockingQueue blocking queue} of
         * {@code Delayed} elements, in which an element can only be taken
         * when its delay has expired.  The <em>head</em> of the queue is that
         * {@code Delayed} element whose delay expired furthest in the
         * past.  If no delay has expired there is no head and {@code poll}
         * will return {@code null}. Expiration occurs when an element's
         * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
         * than or equal to zero.  Even though unexpired elements cannot be
         * removed using {@code take} or {@code poll}, they are otherwise
         * treated as normal elements. For example, the {@code size} method
         * returns the count of both expired and unexpired elements.
         * This queue does not permit null elements.
         * /

由定义可知,DelayQueue 是一个无界阻塞队列,队列中的元素只有在延迟期满后才能被取出。队列的头部存储的是最先到期的元素。添加进该队列的元素必须实现 Delayed 接口,指定延迟时间,元素过期的判断是根据 getDelay(TimeUnit unit) 方法的返回值,返回值小于等于 0,则认为元素过期。队列不允许存储空元素。

二、DelayQueue 的使用场景

DelayQueue 被用于需要延迟处理任务的场景,例如,网民在网上商城下单后,如果超时未支付,订单会被后台系统关闭。这种需要延时处理的场景就可以采用 DelayQueue 实现。

三、原理解析(源码)
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
// ...
}

DelayQueue 类继承了AbstractQueue,并实现了 BlockingQueue 接口,DelayQueue 的泛型参数(即队列中的元素)要实现 Delayed 接口。Delayed 接口定义如下。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

Delayed 接口继承了 Comparable 接口,Comparable 接口定义如下。

public interface Comparable<T> {
	public int compareTo(T o);
}

所以,延迟队列中的元素要实现 getDelay(TimeUnit unit) 和 compareTo(T o) 两个方法。

  • compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚
  • getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。

延迟队列的属性

DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。

DelayQueue 的主要方法

offer 添加元素

public boolean offer(E e) {
        // 获取全局独占锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向优先队列中插入元素
            q.offer(e);
            // 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            // 释放全局独占锁
            lock.unlock();
        }
    }

  • leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。

  • 如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。

  • 如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

 DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

take 取出元素

take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。

public E take() throws InterruptedException {
        // 获取全局独占锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取队头元素,peek 方法不会删除元素
                E first = q.peek();
                if (first == null)
                    // 若队头为空,则阻塞当前线程
                    available.await();
                else {
                    // 否则获取队头元素的超时时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 已超时,直接出队
                    if (delay <= 0)
                        return q.poll();
                    // 释放 first 的引用,避免内存泄漏
                    first = null; // don't retain ref while waiting
                    // leader != null 表明有其他线程在操作,阻塞当前线程
                    if (leader != null)
                        available.await();
                    else {
                        // leader 指向当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 超时阻塞
                            available.awaitNanos(delay);
                        } finally {
                            // 释放 leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放全局独占锁    
            lock.unlock();
        }
    }

poll 取出元素

取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
图解 DelayQueue 的生产/消费过程

DelayQueue 是 Leader-Follower 模式的变种,以下通过队列及消费者线程状态变化大致说明一下 DelayQueue 的运行过程。

41e75f7015e5479090f881bd88365a3b

  • 因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单,所以此图不考虑向队列添加元素的生产者线程。假设现在共有三个消费者线程。
  • 队列中的元素按到期时间排序,队列头部的元素 2s 以后到期。消费者线程1查看了头部元素以后,发现还需要 2s 才到期,于是它进入等待状态,2s 以后醒来,等待头部元素到期的线程称为 Leader 线程。
  • 消费者线程 2 与消费者线程 3 处于待命状态,它们不等待队列中的非头部元素。当消费者线程1拿到对象 5 以后,会向它们发送 signal。这个时候两个线程中的一个会结束待命状态而进入等待状态。
0

评论区