Java多线程中的wait/notify通信模式实例详解

网友投稿 211 2023-02-16

Java多线程中的wait/notify通信模式实例详解

前言

最近在看一些JUC下的源码,更加意识到想要学好java多线程,基础是关键,比如想要学好ReentranLock源码,就得掌握好AQS源码,而AQS源码中又有很多Java多线程经典的一些应用;再比如看了线程池的核心源码实现,又学到了很多核心实现,其实这些都可以提出来慢慢消化并变成自己的知识点,今天这个Java等待/通知模式其实是Thread.join()实现的关键,还有线程池工作线程中线程跟线程之间的通信的核心所在,故在此为了加深理解,做此记录!

参考资料《Java并发编程艺术》(电子PDF版)

一、什么是Java线程的等待/通知模式

1、等待/通知模式概述

首先先介绍下官方的一个正式的介绍:

等待/通知机制,是指一个线程A调用了对象object的wait()方法进入等待状态,而另一个线程B调用了对象object的notify或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而还行后续操作。

而我的理解是(举例说明):

假设工厂里有两条流水线,某个工作流程需要这两个流水线配合完成,这两个流水线分别是A和B,其中A负责准备各种配件,B负责租装配件之后产出输出到工作台。B的工作需要A的配件准备充分,否则就会一直等待A准备好配件,并且A准备好配件后会通过一个开头通知告诉B我已经准备好了,你那边不用一直等待了,可以继续执行任务了。流程A与流程B就是对应的线程A与线程B之间的通信,即可以理解为相互配合,具体也就是“”通知/等待“”机制!

2、需要注意的细节

那么,我们都知道超类Object有wait()方法与notify()/notifyAll()方法,在进行正式代码举例之前,应该先加深下对这三个方法的理解与一些细节(有一些细节确实容易被忽略)

调用wait()方法,会释放锁(这一点我想大部分人都知道),线程状态由RUNNING->WAITNG,当前线程进入对象等待队列中;

调用notify()/notifyAll()方法不会立马释放锁(这一点我大家人也应该知道,但是什么时候释放锁呢?--------请看下一条),notify()方法是将等待队列中的线程移到同步队列中,而notifyAll()则是全部移到同步队列中,被移出的线程状态WAITING-->BLOCKED;

当前调用notify()/notifyAll()的线程释放锁了才算释放锁,才有机会唤醒wait线程返回(为什么有才有机会返回呢?------继续看下一条)

从wait()返回的前提是必须获得调用对象锁,也就是说notify()与notifyAll()释放锁之后,wait()进入BLOCKED状态,如果其他线程有竞争当前锁的话,wait线程继续争取锁资格(不好理解的话,请看下面的代码举例)

使用wait()、notify()、notifyAll()方法时需要先调对象加锁(这可能是最容易忽视的点了,至于为什么,请先看了代码之后,看本篇博文最后补充:wait()、notify()、notifyAll()加锁的原因----防止线程即饥饿)

二、代码举例

1、结合代码理解

结合上述的“工厂流程装配配件并产出的例子”,我

们有两个线程(流水线)WaitThread与NotifyThread、其中WaitThread是被通知的任务,完成主要的工作(组装配件完成

产品),需要时刻判断标志位(开关);NotifyThread是需要通知的任务,需要对WaitThread进行“监督通知”,两个配合才能更好完成产品的组装并输出。

public class WaitNotify {

static Object lock = new Object();

static boolean flag = false;

public static void main(String[] args) {

new Thread(new WaitThread(), "WaitThread").start();

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

new Thread(new NotifyThread(), "NotifyThread").start();

}

/**

* 流水线A,完成主要任务

*/

static class WaitThread implements Runnable{

@Override

public void run() {

// 获取object对象锁

synchronized (lock){

// 条件不满足时一直在等,等另外的线程改变该条件,并通知该wait线程

while (!flag){

try {

System.out.println(Thread.currentThread() + " is waiting, flag is "+flag);

// wait()方法调用就会释放锁,当前线程进入等待队列。

lock.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// TODO 条件已经满足,不继续while,完成任务

System.out.println(Thread.currentThread() + " is running, flag is "+flag);

}

}

}

/**

* 流水线B,对开关进行控制,并通知流水线A

*/

static class NotifyThread implements Runnable{

@Override

public void run() {

// 获取等wait线程同一个object对象锁

synchronized (lock){

flag = true;

// 通知wait线程,我已经改变了条件,你可以继续返回执行了(返回之后继续判断while)

// 但是此时通知notify()操作并立即不会释放锁,而是要等当前线程释放锁

// TODO 我准备好配件了,我需要通知全部的组装流水线A.....

lock.notifyAll();

System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag);

}

}

}

}

运行main函数,输出:

Thread[WaitThread,5,main] is waiting, flag is false

Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true

Thread[WaitThread,5,main] is running, flag is true

车床流水工作开启,流水线的开关一开始是关闭的(flag=false),流水线B(NotifyThread)去开启后,开始自动唤醒流水线A(WaitThread),整个流水线开始工作了......

Thread[WaitThread,5,main] is waiting, flag is false: 一开始流水线A发现自己没有配件可租装,所以等流水线A准备好配件(这样是不是觉得特别傻,哈哈哈,真正的流水线不会浪费时间等的,而且会有很多条流水线B准备配件的,这里只是举例说明,望理解!);

Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true:流水线B准备好了配件,开启开关(flag=ture),并通知流水线A,让流水线A开始工作;

Thread[WaitThread,5,main] is running, flag is true,流水线B收到了通知,再次检查开关是否开启了,开启的话就开始返回继续完成工作了。

其实结合上述我举的例子还是很好理解的,下面是大概的一个粗略时序图:

2、扩展理解----wait()返回的前提是获得了锁

上述已经表达了这个注意的细节:从wait()返回的前提是必须获得调用对象锁,我们再增加能竞争lock的同步代码块(红字部分)。

public class WaitNotify {

static Object lock = new Object();

static boolean flag = false;

public static void main(String[] args) {

new Thread(new WaitThread(), "WaitThread").start();

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

new Thread(new NotifyThread(), "NotifyThread").start();

}

/**

* 流水线A,完成主要任务

*/

static class WaitThread implements Runnable{

@Override

public void run() {

// 获取object对象锁

synchronized (lock){

// 条件不满足时一直在等,等另外的线程改变该条件,并通知该wait线程

while (!flag){

try {

System.out.println(Thread.currentThread() + " is waiting, flag is "+flag);

// wait()方法调用就会释放锁,当前线程进入等待队列。

lock.wait();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// TODO 条件已经满足,不继续while,完成任务

System.out.println(Thread.currentThread() + " is running, flag is "+flag);

}

}

}

/**

* 流水线B,对开关进行控制,并通知流水线A

*/

static class NotifyThread implements Runnable{

@Override

public void run() {

// 获取等wait线程同一个object对象锁

synchronized (lock){

flag = true;

// 通知wait线程,我已经改变了条件,你可以继续返回执行了(返回之后继续判断while)

// 但是此时通知notify()操作并立即不会释放锁,而是要等当前线程释放锁

// TODO 我准备好配件了,我需要通知全部的组装流水线A.....

lock.notifyAll();

System.out.println(Thread.currentThread() + " hold lock, notify waitThread and flag is "+flag);

}

// 模拟跟流水线B竞争

synchronized (lock){

System.out.println(Thread.currentThread() + " hold lock again");

}

}

}

}

输出结果:

Thread[WaitThread,5,main] is waiting, flag is false

Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true

Thread[NotifyThread,5,main] hold lock again

Thread[WaitThread,5,main] is running, flag is true

其中第三条跟第四条顺序可能会反着来的,这就是因为lock锁可能被红字部分的synchronized代码块竞争获取(这样wait()方法可能获取不到lock锁,不会返回),也可能被waitThread获取从wait()方法返回。

Thread[WaitThread,5,main] is waiting, flag is false

Thread[NotifyThread,5,main] hold lock, notify waitThread and flag is true

Thread[WaitThread,5,main] is running, flag is true

Thread[NotifyThread,5,main] hold lock again

三、等待/通知模式的应用

1、Thread.join()中源码应用

Thread.join()作用:当线程A等待thread线程终止之后才从thread.join()返回, 每个线程终止的前提是前驱线程终止,每个线程等待前驱线程终止后,才从join方法返回,这里涉及了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)。

Thread.join()源码中,使用while选好判断前驱线程是否活着,如果前驱线程还活着就一直wait等待,当然如果超时的话就直接返回。

public final synchronized void join(long millis)

throws InterruptedException {

long base = System.currentTimeMillis();

long now = 0;

if (millis < 0) {

throw new IllegalArgumentException("timeout value is negative");

}

// 这里的while(){wait(millis)} 就是利用等待/通知中的等待模式,只不过加上了超时设置

if (millis == 0) {

// while循环,当线程还活着的时候就一直循环等待,直到线程终止

while (isAlive()) {

// wait等待

wait(0);

}

// 条件满足时返回

} else {

while (isAlive()) {

long delay = millis - now;

if (delay <= 0) {

break;

}

wait(delay);

now = System.currentTimeMillis() - base;

}

}

}

2、其它的应用

工作队列中线程job没有的话也就是工作队列为空的情况下,等待客户端放入工作队列线程任务,并通知工作线程继续从工作队列中获取线程执行。

注:关于线程池的应用源码这里不做介绍,因为一时也讲不完(自己也还没有完全消化),先简单介绍下应用到的地方还有概念。

补充:其实数据库的连接池也类似线程池这种工作流程,也会涉及等待/通知模式。

3、等待/通知范式

介绍了那么多应用,这种模式应该有个统一的范式来套用。对的,必然是有的:

对于等待者(也可以称之为消费者):

synchronized (对象lock) {

while (条件不满足) {

对象.wait();

}

// TODO 处理逻辑

}

对于通知者(也可以称之为生产者):

synchronized (对象lock) {

while (条件满足) {

改变条件

对象.notify();

}

}

注意:实际开发中最好采用的是超时等待/通知模式,在thread.join()源码方法中完美体现

四、wait()、notify()、notifyAll()使用前需要加锁的原因----防止线程即饥饿

(1)其实根据wait()注意事项也能明白,wait()是释放锁的,那么不加锁哪来释放锁!

2)wait()与notify()或者notifyAll()必须是搭配一起使用的,否则线程调用object.wait()之后,没有超时机制,也没有调用notify()或者notifyAll()唤醒的话,就一直处于WAITING状态,造成调用wait()的线程一直都是饥饿状态。

(3)由于第2条的,我们已知:即便我们使用了notify()或者notifyAll()去唤醒线程,但是没有在适当的时机唤醒(比如调用wait()之前就唤醒了),那么仍然调用wait()线程处于WAITING状态,所以我们必须保证wait()方法要么不执行,要么就执行完在被唤醒。也就是下列代码中1那里不能允许插入调用notify/notifyAll,自然而然就增加synchronized关键字,保证wait()操作整体执行不被破坏!

synchronized (对象lock) {

while (条件不满足) {

// 1 这里如果先执行了notify/notifyAll方法,那么2执行之后,该线程就一直WAITING

对象.wait(); // 2

}

// TODO 处理逻辑

}

用图片展示执行顺序就是:

(4)注意synchronized代码块中,代码错误或者其它原因线程终止的话,没有执行到wait()方法的话,是会自动释放锁的,不必担心会死锁。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SVN导入maven项目报错解决方案
下一篇:Java 包和访问权限操作
相关文章

 发表评论

暂时没有评论,来抢沙发吧~