Java常见的阻塞队列总结

网友投稿 290 2023-01-10

Java常见的阻塞队列总结

java阻塞队列

阻塞队列和普通队列主要区别在阻塞二字:

阻塞添加:队列已满时,添加元素线程会阻塞,直到队列不满时才唤醒线程执行添加操作

阻塞删除:队列元素为空时,删除元素线程会阻塞,直到队列不为空再执行删除操作

常见的阻塞队列有 LinkedBlockingQueue 和 ArrayBlockingQueue,其中它们都实现 BlockingQueue 接口,该接口定义了阻塞队列需实现的核心方法:

public interface BlockingQueue extends Queue {

// 添加元素到队尾,成功返回true,队列满抛出异常 IllegalStateException

boolean add(E e);

// 添加元素到队尾,成功返回 true,队列满返回 false

boolean offer(E e);

// 阻塞添加

void put(E e) throws InterruptedException;

// 阻塞添加,包含最大等待时长

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

// 阻塞移除队顶元素

E take() throws InterruptedException;

// 阻塞移除队顶元素,包含最大等待时长

E poll(long timeout, TimeUnit unit) throws InterruptedException;

// 返回可以添加到队列不阻塞的最大数量

int remainingCapacity();

// 如果存在元素则删除,成功返回 true,失败返回 false

boolean remove(Object o);

// 是否包含某元素

public boolean contains(Object o);

// 批量移除元素并添加入指定集合

int drainTo(Collection super E> c);

// 批量移除包含最大数量

int drainTo(Collection super E> c, int maxElements);

}

除了上面的方法,还有三个继承自 Queue 接口的方法常常被用到:

// 获取队列头元素,不删除,没有抛出异常 NoSuchElementException

E element();

// 获取队列头元素,不删除,没有返回 null

E peek();

// 获取并移除队列头元素,没有返回 nul

E poll();

根据具体作用,方法可以被分为以下三类:

添加元素类:add() 成功返回 true,失败抛异常、offer() 成功返回 true,失败返回 false,可以定义最大等待时长、put() 阻塞方法

删除元素类:remove() 成功返回 true,失败返回 false、poll() 成功返回被移除元素,为空返回 null、take() 阻塞方法

查询元素类:element() 成功返回元素,否则抛出异常、peek() 返回对应元素或 null

根据方法类型又可以分为阻塞和非阻塞,其中 put()、take() 是阻塞方法,带最大等待时长的 offer() 和 poll() 也是阻塞方法,其余都是非阻塞方法,阻塞队列基于上述方法实现

ArrayBlockingQueue 基于数组实现,满足队列先进先出特性,下面我们通过一段代码初步认识:

public class ArrayBlockingQueueTest {

ArrayBlockingQueue queue = new ArrayBlockingQueue(1);

public static void main(String[] args) {

ArrayBlockingQueueTest test = new ArrayBlockingQueueTest();

new Thread(test.new Product()).start();

new Thread(test.new Customer()).start();

}

class Product implements Runnable {

@Override

public void run() {

while (true) {

try {

queue.put(new TestProduct());

System.out.println("生产者创建产品等待消费者消费");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

class Customer implements Runnable {

@Override

public void run() {

while (true) {

try {

Thread.sleep(1000);

queue.take();

System.out.println("消费者消费产品等待生产者创建");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

class TestProduct {

}

}

上述代码比较简单,在一个容量为1的阻塞队列中,生产者和消费者由于容量限制依次阻塞运行。

ArrayBlockingQueue 基于 ReentrantLock 锁和 Condition 等待队列实现,因此存在公平和非公平的两种模式。公平场景下所有被阻塞的线程按照阻塞顺序执行,非公平场景下,队列中的线程和恰好准备进入队列的线程竞争,谁抢到就是谁的。默认使用非公平锁,因为效率更高:

public ArrayBlockingQueue(int capacity) {

this(capacity, false);

}

public ArrayBlockingQueue(int capacity, boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

this.items = new Object[capacity];

lock = new ReentrantLock(fair);

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

从代码可以看出,ArrayBlockingQueue 通过一个 ReentrantLock 锁以及两个 Condition 等待队列实现,它的属性如下:

public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable {

// 保存数据的数组

final Object[] items;

// 移除元素的索引

int takeIndex;

// 添加元素的索引

int putIndex;

// 元素数量

int count;

// 用于并发控制的锁

final ReentrantLock lock;

// 不为空,用于take()操作

private final Condition notEmpty;

// 不满,用于put()操作

private final Condition notFull;

// 迭代器

transient Itrs itrs = null;

}

从代码可以看出,ArrayBlockingQueue 使用同一个锁、移除元素和添加元素通过数组下标的方式记录,分表表示队列头和队列尾。通过两个等待队列分别阻塞 take() 和 put() 方法,下面我们直接看源码:

public boolean add(E e) {

if (offer(e))

return true;

else

throw new IllegalStateException("Queue full");

}

public boolean offer(E e) {

// 检查是否为空

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 判断队列是否已满

if (count == items.length)

return false;

else {

enqueue(e);

return true;

}

} finally {

lock.unlock();

}

}

private void enqueue(E x) {

final Object[] items = this.items;

// 赋值保存数据

items[putIndex] = x;

// 循环复用空间

if (++putIndex == items.length)

putIndex = 0;

count++;

// 唤醒take线程

notEmpty.signal();

}

从代码可以看出:add() 方法基于 offer() 方法实现,offer() 方法添加失败返回 false 后,add() 方法抛出异常。offer() 方法会加锁,保证线程安全,队列没满时执行入队操作,入队操作通过操作数组实现,并且通过循环复用数组空间。元素添加成功后队列不为空,调用 signal() 方法唤醒移除元素的阻塞线程,最后我们看 put() 方法:

public void put(E e) throws InterruptedException {

// 判断不为空

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 队列满就挂起在等待队列

while (count == items.length)

notFull.await();

enqueue(e);

} finally {

lock.unlock();

}

}

从代码可以看出,当队列满时,当前线程会被挂起到等待队列中,直到队列不满时被唤醒执行添加操作。下面我们看删除操作:

public boolean remove(Object o) {

// 判断是否为 NULL

if (o == null) return false;

final Object[] items = this.items;

final ReentrantLock lock = this.lock;

lock.lock();

try {

if (count > 0) {

final int putIndex = this.putIndex;

int i = takeIndex;

// 从移除下标开始遍历到添加新元素的下标

do {

if (o.equals(items[i])) {

removeAt(i);

return true;

}

// 循环判断,移除下标可能大于添加下标(添加下标二次遍历时)

if (++i == items.length)

i = 0;

} while (i != putIndex);

}

return false;

} finally {

lock.unlock();

}

}

void removeAt(final int removeIndex) {

final Object[] items = this.items;

// 要删除的元素正好是移除下标

if (removeIndex == takeIndex) {

items[takeIndex] = null;

// 循环删除

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

} else {

final int putIndex = this.putIndex;

// 如果不是移除下标,从该下标开始到添加下标,所有元素左移一位

for (int i = removeIndex;;) {

int next = i + 1;

if (next == items.length)

next = 0;

if (next != putIndex) {

// 向左移除

items[i] = items[next];

i = next;

} else {

// 最后put下标置为null

items[i] = null;

this.putIndex = i;

break;

}

}

count--;

// 更新迭代器

if (itrs != null)

itrs.removedAt(removeIndex);

}

notFull.signal();

}

remove() 和 poll()、take() 不同,它可以删除指定的元素。这里需要考虑删除的元素不是移除索引指向的情况,从代码可以看出,当要删除的元素不是移除索引指向的元素时,将所有从被删除元素下标开始到添加元素下标所有元素左移一位。

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return (count == 0) ? null : dequeue();

} finally {

lock.unlock();

}

}

private E dequeue() {

final Object[] items = this.items;

E x = (E) items[takeIndex];

items[takeIndex] = null;

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued();

// 移除元素后唤醒put()添加线程

notFull.signal();

return x;

}

相比 remove() 方法,poll() 方法简单了很多,这里不做赘述,下面我们看 take():

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 队列为空就挂起

while (count == 0)

notEmpty.await();

return dequeue();

} finally {

lock.unlock();

}

}

take() 方法和 put() 方法可以说基本一致,相对也比较简单,最后我们来看看两个查询方法:

public E element() {YKfOLPx

E x = peek();

if (x != null)

return x;

else

throw new NoSuchElementException();

}

public E peek() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 直接返回移除元素下标对应的元素,也就是队列头

return itemAt(takeIndex);

} finally {

lock.unlock();

}

}

final E itemAt(int i) {

return (E) items[i];

}

element() 基于 peek() 方法实现实现、当队列为空时,peek() 方法返回 null,element() 抛出异常。关于 ArrayBlockingQueue 就介绍到这里

LinkedBlockingQueue 基于链表实现,它的属性如下:

public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable {

// 链表节点,存储元素

static class Node {

E item;

Node next;

Node(E x) { item = x; }

}

// 链表容量

private final int capacity;

// 当前元素数量

private final AtomicInteger count = new AtomicInteger();

// 头节点

transient Node head;

// 尾节点

private transient Node last;

// 删除锁

private final ReentrantLock takeLock = new ReentrantLock();

// 不为空等待队列

private final Condition notEmpty = takeLock.newCondition();

// 添加锁

private final ReentrantLock putLock = new ReentrantLock();

// 不满等待队列

private final Condition notFull = putLock.newCondition();

}

从代码可以看出,元素被封装为 Node 节点保存在单向链表中,其中链表默认长度为 Integer.MAX_VALUE,因此在使用时需注意内存溢出:当添加元素速度大于删除元素速度时,队列最终会记录到大量不会用到并且无法回收的对象,导致内存溢出。

ArrayBlockingQueue 和 LinkedBlockingQueue 的主要区别在于 ReentrantLock 锁的数量和等待队列,LinkedBlockingQueue 用到两个锁和两个等待队列,也就是说添加和删除操作可以并发执行,整体效率更高。下面我们直接看代码:

public boolean add(E e) {

if (offer(e))

return true;

else

throw new IllegalStateException("Queue full");

}

public boolean offer(E e) {

// 元素为空抛出异常

if (e == null) throw new NullPointerException();

// 获取当前队列容量

final AtomicInteger count = this.count;

// 队列已满时直接返回false

if (count.get() == capacity)

return false;

int c = -1;

Node node = new Node(e);

// 获取添加锁

final ReentrantLock putLock = this.putLock;

putLock.lock();

try {

// 二次判断,因为上面判断时未加锁,数据可能已更新

if (count.get() < capacity) {

// 入队操作

enqueue(node);

// 获取还未添加元素前,队列的容量

c = count.getAndIncrement();

if (c + 1 < capacity)

// 唤醒其它添加元素的线程

notFull.signal();

}

} finally {

putLock.unlock();

}

// 如果添加前队列没有数据,也就是说现在有一条数据时

if (c == 0)

// 唤醒take线程

signalNotEmpty();

return c >= 0;

}

private void enqueue(Node node) {

last = last.next = node;

}

private void signalNotEmpty() {

// 唤醒take线程前必须获取对应take锁

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

notEmpty.signal();

} finally {

takeLock.unlock();

}

}

这里有以下几点需要我们注意:

1.LinkedBlockingQueue count 属性必须通过并发类封装,因为可能存在添加、删除两个线程并发执行,需考虑同步

2.这里需要判断两次的主要原因在于方法开始时并没有加锁,数值可能改变,因此在获取到锁后需要二次判断

3.和 ArrayBlockingQueue 不同,LinkedBlockingQueue 在队列不满时会唤醒添加线程,这样做的原因是 LinkedBlockingQueue 中添加和删除操作使用不同的锁,各自只需管好自己,还可以提高吞吐量。而 ArrayBlockingQueue 使用唯一锁,这样做会导致移除线程永远不被唤醒或添加线程永远不被唤醒,吞吐量较低

4.添加元素前队列长度为0才唤醒移除线程,因为队列长度为0时,移除线程肯定已经挂起,此时唤醒一个移除线程即可。因为移除线程和添加线程类似,都会自己唤醒自己。而 c>0 时只会有两种情况:存在移除线程在运行,如果有会递归唤醒,无须我们参与、不存在移除线程运行,此时也无须我们参与,等待调用 take()、poll() 方法即可

5.唤醒只针对 put()、take() 方法阻塞的线程,offer() 方法直接返回(不包含最大等待时长),不参与唤醒场景

下面我们来看 put() 阻塞方法的实现:

public void put(E e) throws InterruptedException {

if (e == null) throw new NullPointerException();

int c = -1;

Node node = new Node(e);

final ReentrantLock putLock = this.putLock;

final AtomicInteger count = this.count;

putLock.lockInterruptibly();

try {

// 队列满时阻塞

while (count.get() == capacity) {

notFull.await();

}

// 入队

enqueue(node);

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

}

从代码可以看出,put() 方法和 offer() 方法唯一区别在于自身通过 condition 阻塞挂起到等待队列,其余基本相同。至此关于添加操作介绍完毕,下面我们看移除方法:

public boolean remove(Object o) {

if (o == null) return false;

// 同时加两个锁

fullyLock();

try {

// 循环查找

for (Node trail = head, p = trail.next; p != null; trail = p, p = p.next) {

if (o.equals(p.item)) {

unlink(p, trail);

return true;

}

}

return false;

} finally {

fullyUnlock();

}

}

void unlink(Node p, Node trail) {

// p是要溢出的节点,trail是它的前驱节点

// 方便gc

p.item = null;

// 引用取消

trail.next = p.next;

if (last == p)

last = trail;

if (count.getAndDecrement() == capacity)

notFull.signal();

}

void fullyLock() {

putLock.lock();

takeLock.lock();

}

void fullyUnlock() {

takeLock.unlock();

putLock.unlock();

}

从代码可以看出,remove() 方法只会在操作前容量不满时唤醒创建线程,并不会唤醒移除线程。并且由于我们不确定要删除元素的位置,因此此时需要加两个锁,确保数据安全。

public E poll() {

final AtomicInteger count = this.count;

if (count.get() == 0)

return null;

E x = null;

int c = -1;

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

try {

if (count.get() > 0) {

x = dequeue();

// 获取移除前队列的元素数量

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

}

} finally {

takeLock.unlock();

}

// 移除前如果队列是满的,唤醒添加线程

if (c == capacity)

signalNotFull();

return x;

}

private E dequeue() {

Node h = head;

// 获取要删除的节点

Node first = h.next;

// 清除原来的头结点(方便gc)

h.next = h;

// 设置新的头结点

head = first;

// 获取返回值

E x = first.item;

// 新头结点置为空

first.item = null;

return x;

}

需要注意的一点,每次出队时更换 head 节点,head 节点本身不保存数据,head.next 记录下次需要出队的元素,每次出队后 head.next 变为新的 head 节点返回并置为 null

poll() 方法和上面提到的 offer() 方法基本镜像相同,这里我再不做过多赘述

public E take() throws InterruptedException {

E x;

int c = -1;

final AtomicInteger count = this.count;

final ReentrantLock takeLock = this.takeLock;

takeLock.lockInterruptibly();

try {

// 队列为空就挂起

while (count.get() == 0) {

notEmpty.await();

}

x = dequeue();

c = count.getAndDecrement();

if (c > 1)

notEmpty.signal();

} finally {

takeLock.unlock();

}

if (c == capacity)

signalNotFull();

return x;

}

take() 方法和 poll() 方法类似,区别在于新增了阻塞逻辑。至此关于溢出元素方法介绍完毕,最后我们看看查询方法源码:

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node(null);

}

public E element() {

E x = peek();

if (x != null)

return x;

else

throw new NoSuchElementException();

}

public E peek() {

if (count.get() == 0)

return null;

final ReentrantLock takeLock = this.takeLock;

takeLock.lock();

try {

Node first = head.next;

if (first == null)

return null;

else

return first.item;

} finally {

takeLock.unlock();

}

}

从代码可以看出,默认 head 和 last 头尾节点都为 null,入队时直接从 next 开始操作,也就是说 head 节点不保存数据。

最后我们来看看有最大等待时长的 offer() 方法:

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

if (e == null) throw new NullPointerException();

// 将时间转换成纳秒

long nanos = unit.toNanos(timeout);

int c = -1;

// 获取锁

final ReentrantLock putLock = this.putLock;

// 获取当前队列大小

final AtomicInteger count = this.count;

// 可中断锁

putLock.lockInterruptibly();

try {

while (count.get() == capacity) {

// 小于0说明已到达最大等待时长

if (nanos <= 0)

return false;

// 如果队列已满,根据等待队列阻塞等待

nanos = notFull.awaitNanos(nanos);

}

// 队列没满直接入队

enqueue(new Node(e));

c = count.getAndIncrement();

if (c + 1 < capacity)

notFull.signal();

} finally {

putLock.unlock();

}

if (c == 0)

signalNotEmpty();

return true;

}

public final long awaitNanos(long nanosTimeout) throws InterruptedException {

if (Thread.interrupted()) throw new InterruptedException();

// 将当前线程封装为 AQS Node 类加入等待队列

Node node = addConditionWaiter();

// 释放锁

int savedState = fullyRelease(node);

//计算过期时间

final long deadline = System.nanoTime() + nanosTimeout;

int interruptMode = 0;

// 当前线程没有唤醒进入同步队列时

while (!isOnSyncQueue(node)) {

// 已经等待相应时间,删除当前节点,将状态设置为已关闭从队列删除

if (nanosTimeout <= 0L) {

transferAfterCancelledWait(node);

break;

}

// 判断是否超时

if (nanosTimeout >= spinForTimeoutThreshold)

// 挂起线程

LockSupport.parkNanos(this, nanosTimeout);

// 判断线程状态是否被中断

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

// 重新计算剩余等待时间

nanosTimeout = deadline - System.nanoTime();

}

// 被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter !=http:// null)

// 清理等待队列中不为Condition状态的线程

unlinkCancelledWaiters();

// 判断是否被中断

if (interruptMode != 0)

// 抛出异常或中断线程,独占模式抛出异常,共享模式中断线程

reportInterruptAfterWait(interruptMode);

// 返回时差,如果成功当前时间小于最大等待时长,返回值大于0,否则返回值小于0

return deadline - System.nanoTime();

}

从代码可以看出,包含最大等待时长的 offer()、poll() 方法通过循环判断时间是否超时的方式挂起在等待队列,达到最大等待时长还未被唤醒或没被执行就返回

ArrayBlockingQueue 和 LinkedBlockingQueue 对比:

大小不同,一个有界,一个无界。ArrayBlockingQueue 必须指定初始大小,LinkedBlockingQueue 无界时可能内存溢出

一个采用数组,一个采用链表,数组保存无须创建新对象,链表需创建 Node 对象

锁机制不同,ArrayBlockingQueue 添加删除操作使用同一个锁,两者操作不能并发执行。LinkedBlockingQueue 添加和删除使用不同锁,添加和删除操作可并发执行,整体效率 LinkedBlockingQueue 更高

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:自有物流查询(物流查询自动查询)
下一篇:详解记录Java Log的几种方式
相关文章

 发表评论

暂时没有评论,来抢沙发吧~