java并发中DelayQueue延迟队列原理剖析

网友投稿 255 2023-01-09

java并发中DelayQueue延迟队列原理剖析

介绍

DelayQueue队列是一个延迟队列,DelayQueue中存放的元素必须实现Delayed接口的元素,实现接口后相当于是每个元素都有个过期时间,当队列进行take获取元素时,先要判断元素有没有过期,只有过期的元素才能出队操作,没有过期的队列需要等待剩余过期时间才能进行出队操作。

源码分析

DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据,它采用的是二叉堆进行的优先队列,使用ReentrantLock锁来控制线程同步,由于内部元素是采用的PriorityQueue来进行存放数据,所以Delayed接口实现了Comparable接口,用于比较来控制优先级,如下代码所示:

public interface Delayed extends Comparable {

/**

* Returns the remaining delay associated with this object, in the

* given time unit.

*

* @param unit the time unit

* @return the remaining delay; zero or negative values indicate

* that the delay has already elapsed

*/

long getDelay(TimeUnit unit);

}

DelayQueue的成员变量如下所http://示:

// 锁。

private final transient ReentrantLock lock = new ReentrantLock();

// 优先队列。

private final PriorityQueue q = new PriorityQueue();

/**

* Leader-Follower的变种。

* Thread designated to wait for the element at the head of

* the queue. This variant of the Leader-Follower pattern

* (http://cs.wustl.edu/~schmidt/POSA/POSA2/) serves to

* minimize unnecessary timed waiting. When a thread becomes

* the leader, it waits only for the next delay to elapse, but

* other threads await indefinitely. The leader thread must

* signal some other thread before returning from take() or

* poll(...), unless some other thread becomes leader in the

* interim. Whenever the head of the queue is replaced with

* an element with an earlier expiration time, the leader

* field is invalidated by being reset to null, and some

* waiting thread, but not necessarily the current leader, is

* signalled. So waiting threads must be prepared to acquire

* and lose leadership while waiting.

*/

private Thread leader = null;

/**

* Condition signalled when a newer element becomes available

* at the head of the queue or a new thread may need to

* become leader.

*/

// 条件,代表如果有数据则通知Follower线程,唤醒线程处理队列内容。

private final Condition available = lock.newCondition();

Leader-Follower模式的变种,用于最小化不必要的定时等待,当一个线程被选择为Leader时,它会等待延迟过去执行代码逻辑,而其他线程则需要无限期等待,在从take或poll返回之前,每当队列的头部被替换为具有更早到期时间的元素时,leader字段将通过重置为空而无效,Leader线程必须向其中一个Follower线程发出信号,被唤醒的 follwer 线程被设置为新的Leader 线程。

offer操作

public boolean offer(E e) {

// 获取到锁

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 将元素存储到PriorityQueue优先队列中

q.offer(e);

// 如果第一个元素是当前元素,说明之前队列中为空,则先将Leader设置为空,通知等待线程可以争抢Leader了。

if (q.peek() == e) {

leader = null;

available.signal();

}

// 返回成功

return true;

} finally {

lock.unlock();

}

}

offer操作前先进行获取锁的操作,也就是同一时间内只能有一个线程可以入队操作。

获取到ReentrantLock锁对象。

将元素添加到PriorityQueue优先队列中

如果队列bZxkIs中最早过期的元素是自己,则说明队列原先是空的,所以将Leader进行重置,通知Follower线程可以成为Leader线程。

最后进行解锁操作。

put操作

put操作其实就是调用的offer操作来进行添加数据的,以下是源码信息:

public void put(E e) {

offer(e);

}

take操作

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

// 获取可中断的锁。

lock.lockInterruptibly();

try {

// 循环获取数据。

for (;;) {

// 获取最早过期的元素,但是不弹出对象。

E first = q.peek();

// 如果最早过期的元素为空,说明队列为空,则线程直接进入无限期等待,并且让出锁。

if (first == null)

// 当前线程无限期等待,直到被唤醒,并且让出锁对象。

available.await();

else {

// 获取最早过期的元素剩余过期时间。

long delay = first.getDelay(NANOSECONDS);

// 如果剩余过期时间小于0,则说明已经过期,反之还没有过期。

if (delay <= )

// 如果已经过期直接获取最早过期的元素,并返回。

return q.poll();

// 如果剩余过期日期大于0,则会进入到这里。

// 将刚才获取的最早过期的元素设置为空。

first = null; // don't retain ref while waiting

// 如果有线程争抢的Leader线程,则进行无限期等待。

if (leader != null)

// 无限期等待并让出锁。

available.await();

else {

// 获取当前线程。

Thread thisThread = Thread.currentThread();

// 设置当前线程变为Leader线程。

leader = thisThread;

try {

// 等待剩余等待时间。

available.awaitNanos(delay);

} finally {

// 将Leader设置为null。

if (leader == thisThread)

leader = null;

}

}

}

}

} finally {

// 如果队列不为空,并且没有Leader则通知等待线程可以成为Leader。

if (leader == null && q.peek() != null)

// 通知等待线程。

available.signal();

lock.unlock();

}

}

当获取元素时,先获取到锁对象。

获取最早过期的元素,但是并不从队列中弹出元素。

最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。

如果最早过期的元素不为空

获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素

如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。

最后将Leader线程设置为空

如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

poll操作

获取最早过期的元素,如果队列头没有过期的元素则直接返回null,反之返回过期的元素。

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

E first = q.peek();

// 如果队列为空或者队列最早过期的元素没有过期,则返回null。

if (first == null || first.getDelay(NANOSECONDS) > 0)

return null;

else

// 出队列操作。

return q.poll();

} finally {

lock.unlock();

}

}

小结

DelayQueue是一个无界的并发延迟阻塞队列,队列中的元素必须实现Delayed接口,相应了需要实现Comparable接口实现比较的方法

Leader-Follower模式的变种,用于最小化不必要的定时等待,当一个线程被选择为Leader时,它会等待延迟过去执行代码逻辑,而其他线程则需要无限期等待,在从take或poll返回之前,每当队列的头部被替换为具有更早到期时间的元素时,leader字段将通过重置为空而无效,Leader线程必须向其中一个Follower线程发出信号,被唤醒的 follwer 线程被设置为新的Leader 线程。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IDEA SSM整合Redis项目实例 附源码
下一篇:api接口 网站(API网站)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~