Java多线程之同步工具类CountDownLatch

网友投稿 238 2022-11-27

Java多线程之同步工具类CountDownLatch

前言:

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

1 CountDownLatch主要方法

void await():如果当前count大于0,当前线程将会wait,直到count等于0或者中断。 PS:当count等于0的时候,再去调用await() ,

线程将不会阻塞,而是立即运行。后面可以通过源码分析得到。

boolean await(long timeout, TimeUnit unit):使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。

void countDown(): 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。

long getCount() :获得计数的数量

2 CountDownLatch使用例子

public class CountDownLatchTest {

private static final int N = 4;

public static void main(String[] args) {

final CountDownLatch latch = new CountDownLatch(4);

for(int i=0;i

{

new Thread(){

public void run() {

try {

System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");

Thread.sleep(3000);

System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");

latch.countDown();

System.out.println("剩余计数"+latch.getCount());

} catch (InterruptedException e) {

e.printStackTrace();

}

};

}.start();

}

try {

System.out.println("等待"+N+"个子线程执行完毕...");

latch.await();

System.out.println(N+"个子线程已经执行完毕");

System.out.println("继续执行主线程");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

子线程Thread-1正在执行

子线程Thread-3正在执行

子线程Thread-2正在执行

等待4个子线程执行完毕...

子线程Thread-0正在执行

子线程Thread-3执行完毕

子线程Thread-2执行完毕

剩余计数2

子线程Thread-1执行完毕

剩余计数1

子线程Thread-0执行完毕

剩余计数3

剩余计数0

4个子线程已经执行完毕

继续执行主线程

3 CountDownLatch源码分析

CountDownLatch是通过计数器的方式来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务之后,就会对计数器减1,当计数器的值为0时,表示所有线程完成了任务,此时等待在闭锁上的线程才继续执行,从而达到等待其他线程完成任务之后才继续执行的目的。

构造函数

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

通过传入一个数值来创建一个CountDownLatch,数值表示线程可以从等待状态恢复,countDown方法必须被调用的次数

countDown方法

public void countDown() {

sync.releaseShared(1);

}

线程调用此方法对count进行减1。当count本来就为0,此方法不做任何操作,当count比0大,调用此方法进行减1,当new count为0,释放所有等待当线程。

countDown方法的内部实现

/**

* Decrements the count of the latch, releasing all waiting threads if

* the count reaches zero.

*

*

If the current count is greater than zero then it is decremented.

* If the new count is zero then all waiting threads are re-enabled for

* thread scheduling purposes.

*

*

If the current count equals zero then nothing happens.

*/

public void countDown() {

sync.releaseShared(1);

}

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();//释放所有正在等待的线程节点

return true;

}

return false;

}

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextcbrnMvXslkb == 0;

}

}

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

await方法

(1)不带参数

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

调用此方法时,当count为0,直接返回true,当count比0大,线程会一直等待,直到count的值变为0,或者线程被中断(interepted,此时会抛出中断异常)。

(2)带参数

public boolean await(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

调用此方法时,当count为0,直接返回true,当count比0大,线程会等待一段时间,等待时间内如果count的值变为0,返回true;当超出等待时间,返回false;或者等待时间内线程被中断,此时会抛出中断异常。

await()方法的内部实现

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

具体如下:

1、检测中断标志位

2、调用tryAcquireShared方法来检查AQS标志位state是否等于0,如果state等于0,则说明不需要等待,立即返回,否则进行3

3、调用doAcquireSharedInterruptibly方法进入AQS同步队列进行等待,并不断的自旋检测是否需要唤醒

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

/*

函数功能:根据AQS的状态位state来返回值,

如果为state=0,返回 1

如果state=1,则返回-1

*/

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

/**

* Acquires in shared interruptible mode.

* @param arg the acquire argument

*/

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {http://

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {//如果大于零,则说明需要唤醒

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

4 CountDownLatch和CyclicBarrier区别

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它brnMvXslkb才执行;

CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

{

new Thread(){

public void run() {

try {

System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");

Thread.sleep(3000);

System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");

latch.countDown();

System.out.println("剩余计数"+latch.getCount());

} catch (InterruptedException e) {

e.printStackTrace();

}

};

}.start();

}

try {

System.out.println("等待"+N+"个子线程执行完毕...");

latch.await();

System.out.println(N+"个子线程已经执行完毕");

System.out.println("继续执行主线程");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

子线程Thread-1正在执行

子线程Thread-3正在执行

子线程Thread-2正在执行

等待4个子线程执行完毕...

子线程Thread-0正在执行

子线程Thread-3执行完毕

子线程Thread-2执行完毕

剩余计数2

子线程Thread-1执行完毕

剩余计数1

子线程Thread-0执行完毕

剩余计数3

剩余计数0

4个子线程已经执行完毕

继续执行主线程

3 CountDownLatch源码分析

CountDownLatch是通过计数器的方式来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务之后,就会对计数器减1,当计数器的值为0时,表示所有线程完成了任务,此时等待在闭锁上的线程才继续执行,从而达到等待其他线程完成任务之后才继续执行的目的。

构造函数

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

通过传入一个数值来创建一个CountDownLatch,数值表示线程可以从等待状态恢复,countDown方法必须被调用的次数

countDown方法

public void countDown() {

sync.releaseShared(1);

}

线程调用此方法对count进行减1。当count本来就为0,此方法不做任何操作,当count比0大,调用此方法进行减1,当new count为0,释放所有等待当线程。

countDown方法的内部实现

/**

* Decrements the count of the latch, releasing all waiting threads if

* the count reaches zero.

*

*

If the current count is greater than zero then it is decremented.

* If the new count is zero then all waiting threads are re-enabled for

* thread scheduling purposes.

*

*

If the current count equals zero then nothing happens.

*/

public void countDown() {

sync.releaseShared(1);

}

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();//释放所有正在等待的线程节点

return true;

}

return false;

}

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextcbrnMvXslkb == 0;

}

}

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

await方法

(1)不带参数

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

调用此方法时,当count为0,直接返回true,当count比0大,线程会一直等待,直到count的值变为0,或者线程被中断(interepted,此时会抛出中断异常)。

(2)带参数

public boolean await(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}

调用此方法时,当count为0,直接返回true,当count比0大,线程会等待一段时间,等待时间内如果count的值变为0,返回true;当超出等待时间,返回false;或者等待时间内线程被中断,此时会抛出中断异常。

await()方法的内部实现

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

具体如下:

1、检测中断标志位

2、调用tryAcquireShared方法来检查AQS标志位state是否等于0,如果state等于0,则说明不需要等待,立即返回,否则进行3

3、调用doAcquireSharedInterruptibly方法进入AQS同步队列进行等待,并不断的自旋检测是否需要唤醒

public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

/*

函数功能:根据AQS的状态位state来返回值,

如果为state=0,返回 1

如果state=1,则返回-1

*/

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

/**

* Acquires in shared interruptible mode.

* @param arg the acquire argument

*/

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {http://

for (;;) {

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {//如果大于零,则说明需要唤醒

setHeadAndPropagate(node, r);

p.next = null; // help GC

failed = false;

return;

}

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

if (failed)

cancelAcquire(node);

}

}

4 CountDownLatch和CyclicBarrier区别

CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它brnMvXslkb才执行;

CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;

CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:多种短程无线通信标准和规范所带来的设计挑战
下一篇:Problem B: 图形计数与求面积
相关文章

 发表评论

暂时没有评论,来抢沙发吧~