Go语言之并发(go并发编程)

网友投稿 296 2022-07-19

Go语言之并发

Go语言直接支持内置支持并发。当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。

Go语言运行时的调度器是一个复杂的软件,这个调度器在操作系统之上。操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine。

Go语言的并发同步逻辑来自一个叫做通信顺讯进程(CSP)的范型。CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而不是通过对数据进行加锁来实现同步访问。这种数据的类型叫做通道(channel) 。

并发与并行

在操作系统中,一个应用程序就可以看作一个进程,而每个进程至少包含一个线程。每个进程的初始线程被称为主线程。

操作系统会在物理处理器(CPU)上调度线程来运行,而Go语言会在逻辑处理器来调度goroutine来运行。1.5版本之上,Go语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器。1.5之前,默认给整个应用程序只分配一个逻辑处理器。

如下图,在运行时把goroutine调度到逻辑处理器上运行,逻辑处理器绑定到唯一的操作系统线程。

当goroutine执行了一个阻塞的系统调用(就是一个非纯CPU的任务)时,调度器会将这个线程与处理器分离,并创建一个新线程来运行这个处理器上提供的服务。

语言运行默认限制每个程序最多创建10000个线程。

注意并发≠并行!并行需要至少2个逻辑处理器。

goroutine

以并发的形式分别显示大写和小写的英文字母

package main

import (

"fmt"

"runtime"

"sync"

func main() {

// 分配一个逻辑处理器给调度器使用

runtime.GOMAXPROCS(1)

// wg用来等待程序完成

var wg sync.WaitGroup

// 计数器加2,表示要等待两个goroutine

wg.Add(2)

fmt.Println("Start!")

// 声明一个匿名函数,并创建一个goroutime

go func() {

// 通知main函数工作已经完成

defer wg.Done()

21: // 显示字母表3次

22: for count:=0; count<3;count++ {

23: for char:='a';char<'a'+26;char++ {

24: fmt.Printf("%c ", char)

25: }

26: }

27: }()

28: // 同上

29: go func() {

30: // 通知main函数工作已经完成

31: defer wg.Done()

32: // 显示字母表3次

33: for count:=0; count<3;count++ {

34: for char:='A';char<'A'+26;char++ {

35: fmt.Printf("%c ", char)

36: }

37: }

38: }()

39: // 等待goroutine结束

40: fmt.Println("Waiting!")

41: wg.Wait()

42: fmt.Println("\nFinish!")

43: }

运行结果后,可以看到先输出的是所有的大写字母,最后才是小写字母。是因为第一个goroutine完成所有显示需要花时间太短了,以至于在调度器切换到第二个goroutine之前,就完成了所有任务。

调度器为了防止某个goroutine长时间占用逻辑处理器,会停止当前正运行的goroutine,运行其他可运行的goroutine运行的机会。

创建两个相同的长时间才能完成其工作的goroutine就可以看到,比如说显示5000以内的素数值。

代码结构如下

1: go printPrime("A")

2: go printPrime("B")

3:

4: func printPrime(prefix string) {

5: ...

6: }

结果类似

1: B:2

2: B:3

3: ...

4: B:4591

5: A:3

6: A:5

7: ...

8: A:4561

9: A:4567

10: B:4603

11: B:4621

12: ...

13: // Completed B

14: A:4457

15: ...

16: // Completed A

如何修改逻辑处理器的数量

1: runtime.GOMAXPROCS(runtime.NUMCPU())

稍微改动下上面的代码,结果就会大不同

1: package main

2:

3: import (

4: "fmt"

5: "runtime"

6: "sync"

7:

8:

9: func main() {

10: // 分配两个逻辑处理器给调度器使用

11: runtime.GOMAXPROCS(2)

12: // wg用来等待程序完成

13: var wg sync.WaitGroup

14: // 计数器加2,表示要等待两个goroutine

15: wg.Add(2)

16: fmt.Println("Start!")

17: // 声明一个匿名函数,并创建一个goroutime

18: go func() {

19: // 通知main函数工作已经完成

20: defer wg.Done()

21: // 显示字母表3次

22: for count:=0; count<10;count++ {

23: for char:='a';char<'a'+26;char++ {

24: fmt.Printf("%c ", char)

25: }

26: }

27: }()

28: // 同上

29: go func() {

30: // 通知main函数工作已经完成

31: defer wg.Done()

32: // 显示字母表3次

33: for count:=0; count<10;count++ {

34: for char:='A';char<'A'+26;char++ {

35: fmt.Printf("%c ", char)

36: }

37: }

38: }()

39: // 等待goroutine结束

40: fmt.Println("Waiting!")

41: wg.Wait()

42: fmt.Println("\nFinish!")

43: }

结果类似下面的(根据CPU单核的性能结果可能结果稍微不一样)

1: Start!

2: Waiting!

3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g

4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z

5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s

6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X

7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q

8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z

9: Finish!

可以发现,goroutine是并行运行的。

只有在有多个逻辑处理器且可以同时让每个goroutine运行在一个可用的物理处理器上的时候,goroutine才会并行运行。

竞争状态

如果两个或者多个goroutine在没有互相同步的情况下,访问某个共享的资源,并且试图同时读和写这个资源,就处于相互竞争的状态。

在竞争状态,每个goroutine都会覆盖另一个goroutine的工作。这种覆盖发生在goroutine发生切换的时候。

每个goroutien都会创造自己的共享变量副本。当切换到领另一个goroutine时,如果这个变量的值在上一个goroutine发生改变,这个goroutine再次运行时,虽然变量的值改变了,但是由于这个goroutine没有更新自己的那个副本的值,而是继续使用,并且将其存回变量的值,从而覆盖上一个goroutine 的工作。

go build –race用来竞争检测器标志来编译程序

锁住共享资源

原子函数

原子函数能够以底层的枷锁机制来同步访问整型变量和指针。省略部分代码如下:

1: var counter int64

2: go incCounter(1)

3: go incCounter(2)

4: func incCounter(id int) {

5: for count:=0;count<2;count++{

6: //安全地对counter加1

7: atomic.AddInt64(&counter, 1)

8: //当前goroutine从线程退出,并放回队列

9: runtime.Gosched()

10: }

11: }

使用atmoi包的AddInt64函数。这些goroutine都会自动根据所引用的变量做同步处理。

另外两个原子函数是LoadInt64和StoreInt64。用法如下:

1: // shutdown是通知正在执行的goroutine停止工作的标志

2: var shutdown int64

3: var wg sync.WaitGroup

4: // 该停止工作了,安全地设置shutdown标志

5: atomic.StoreInt64(&shutdown, 1)

6: // 等待goroutine结束

7: wg.Wait()

8: // 检测是否停止工作,如果shutdown==1那么goroutine就会终止

9: if atomic.LoadInt64(&shutdown) == 1 {

10: break

11: }

12:

互斥锁

另一种同步访问共享资源的方式是互斥锁。主要代码如下:

1: var (

2: // counter是所有goroutine都要增加其值的变量

3: counter int

4: wg sync.WaitGroup

5: // mutex用来定义一段代码临界区

6: mutex sync.Mutex

7: )

8: func main...

9: // 业务代码

10: func incCounter(id int) {

11: defer wg.Done()

12: for i:=0;i<2;i++ {

13: //同一时期只允许一个goroutine进入

14: mutex.Lock()

15: //大括号并不是必须的

16: {

17: //捕获counter的值

18: value := counter

19: //当前goroutine从线程退出,并返回到队列

20: runtime.Gosched()

21: //增加本地value变量的值

22: value++

23: //将该值保存回counter

24: counter = value

25: }

26: // 释放锁,允许其他正在等待的goroutine

27: mutex.Unlock()

28: }

29: }

通道

通道在goroutine之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。

可以通过通道共享内置类型,命名类型,结构类型和引用类型的值或者指针。

go语言需要使用make来创建一个通道,chan是关键字:

1: // 无缓冲的整型通道

2: unbuffered := make(chan int)

3: // 有缓冲的字符串通道

4: buffered := make(chan string, 10)

向通道发送值

1: buffered := make(chan string, 10)

2: // 通通道发送一个字符串

3: buffered <- "Gopher"

4: // 从通道接收一个字符串

5: value := <-buffered

无缓冲的通道是指在接收前没有能力保存任何值的通道。发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。如果没有准备好,通道会导致goroutine阻塞等待。所以无缓冲通道保证了goroutine之间同一时间进行数据交换。

1: // 四个goroutine间的接力比赛

2: package main

3:

4: import (

5: "fmt"

6: "sync"

7: "time"

8: )

9

10: var wg sync.WaitGroup

11:

12: func main() {

13: //创建一个无缓冲的通道

14: baton := make(chan int)

15: wg.Add(1)

16: // 第一步跑步者持有接力棒

17: go Runner(baton)

18: // 开始比赛

19: baon <- 1

20: // 等待比赛结束

21: wg.Wait()

22: }

23:

24: // Ruuner模拟接力比赛中的一位跑步者

25: func Runner(baton chan int) {

26: var newRunner int

27: // 等待接力棒

28: runner := <-baton

29: // 开始跑步

30: fmt.Printf("运动员%d带着Baton跑\n", runner)

31: // 创建下一步跑步者

32: if runner != 4{

33: newRunner = runner + 1

34: fmt.Printf("运动员%d上线\n", newRunner)

35: go Runner(baton)

36: }

37: // 围绕跑到跑

38: time.Sleep(100 * time.Millisecond)

39: // 比赛结束了吗?

40: if runner == 4{

41: fmt.Printf("运动员%d完成,比赛结束\n", runner)

42: wg.Done()

43: return

44: }

45: // 将接力棒交给下一位跑步者

46: fmt.Printf("运动员%d与运动员%d交换\n", runner, newRunner)

47: baton <- newRunner

48: }

结果:

1: 运动员1带着Baton跑

2: 运动员2上线

3: 运动员1与运动员2交换

4: 运动员2带着Baton跑

5: 运动员3上线

6: 运动员2与运动员3交换

7: 运动员3带着Baton跑

8: 运动员4上线

9: 运动员3与运动员4交换

10: 运动员4带着Baton跑

11: 运动员4完成,比赛结束

有缓冲的通道则能在接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。只有在通道没有可用缓冲区或者没有要接收的值时,发送或者接收才会阻塞。

1: package main

2:

3: import (

4: "fmt"

5: "math/rand"

6: "sync"

7: "time"

8: )

9:

10: const (

11: // goroutine的数量

12: numberGoroutines = 4

13: // 工作的数量

14: taskLoad = 10

15: )

16:

17: var wg sync.WaitGroup

18:

19: // 初始化随机数种子

20: func init() {

21: rand.Seed(time.Now().Unix())

22: }

23: func main() {

24: // 创建一个有缓冲的通道来管理工作

25: tasks := make(chan string, taskLoad)

26: wg.Add(numberGoroutines)

27: // 增加一组要完成的工作

28: for post:=1;post

29: tasks <- fmt.Sprintf("Task:%d", post)

30: }

31: // 启动goroutine来处理工作

32: for i:=1;i

33: go worker(tasks, i)

34: }

35: // 有工作处理完时关闭通道

36: closetasks)

37:

38: wg.Wait()

39: fmt.Printf("all finished!")

40:

41: }

42:

43: func worker(tasks chan string, worker_id int) {

44: defer wg.Done()

45:

46: for {

47: //等待分配工作

48: task, ok := <-tasks

49: if !ok {

50: //通道变空

51: fmt.Printf("Worker%d shut down\n", worker_id)

52: return

53: }

54: // 开始工作

55: fmt.Printf("Worker%d start %s\n", worker_id, task)

56:

57: // 随机等待一段时间

58: sleep := rand.Int63n(100)

59: time.Sleep(time.Duration(sleep)*time.Millisecond)

60: // 显示完成了工作

61: fmt.Printf("Worker%d Completed %s\n", worker_id, task)

62: }

63:

输出结果:

1: Worker4 start Task:1

2: Worker1 start Task:2

3: Worker2 start Task:3

4: Worker3 start Task:4

5: Worker3 Completed Task:4

6: Worker3 start Task:5

7: Worker4 Completed Task:1

8: Worker4 start Task:6

9: Worker2 Completed Task:3

10: Worker2 start Task:7

11: Worker3 Completed Task:5

12: Worker3 start Task:8

13: Worker Completed Task:7

14: Worker2 start Task:9

15: Worker3 Completed Task:8

16: Worker3 shut down

17: Worker4 Completed Task:6

18: Worker4 shut down

19: Worker1 Completed Task:2

20: Worker1 shut down

21: Worker2 Completed Task:9

22: Worker2 shut down

23: all inished!

由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。

当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。

29: tasks <- fmt.Sprintf("Task:%d", post)

30: }

31: // 启动goroutine来处理工作

32: for i:=1;i

33: go worker(tasks, i)

34: }

35: // 有工作处理完时关闭通道

36: closetasks)

37:

38: wg.Wait()

39: fmt.Printf("all finished!")

40:

41: }

42:

43: func worker(tasks chan string, worker_id int) {

44: defer wg.Done()

45:

46: for {

47: //等待分配工作

48: task, ok := <-tasks

49: if !ok {

50: //通道变空

51: fmt.Printf("Worker%d shut down\n", worker_id)

52: return

53: }

54: // 开始工作

55: fmt.Printf("Worker%d start %s\n", worker_id, task)

56:

57: // 随机等待一段时间

58: sleep := rand.Int63n(100)

59: time.Sleep(time.Duration(sleep)*time.Millisecond)

60: // 显示完成了工作

61: fmt.Printf("Worker%d Completed %s\n", worker_id, task)

62: }

63:

输出结果:

1: Worker4 start Task:1

2: Worker1 start Task:2

3: Worker2 start Task:3

4: Worker3 start Task:4

5: Worker3 Completed Task:4

6: Worker3 start Task:5

7: Worker4 Completed Task:1

8: Worker4 start Task:6

9: Worker2 Completed Task:3

10: Worker2 start Task:7

11: Worker3 Completed Task:5

12: Worker3 start Task:8

13: Worker Completed Task:7

14: Worker2 start Task:9

15: Worker3 Completed Task:8

16: Worker3 shut down

17: Worker4 Completed Task:6

18: Worker4 shut down

19: Worker1 Completed Task:2

20: Worker1 shut down

21: Worker2 Completed Task:9

22: Worker2 shut down

23: all inished!

由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。

当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。

33: go worker(tasks, i)

34: }

35: // 有工作处理完时关闭通道

36: closetasks)

37:

38: wg.Wait()

39: fmt.Printf("all finished!")

40:

41: }

42:

43: func worker(tasks chan string, worker_id int) {

44: defer wg.Done()

45:

46: for {

47: //等待分配工作

48: task, ok := <-tasks

49: if !ok {

50: //通道变空

51: fmt.Printf("Worker%d shut down\n", worker_id)

52: return

53: }

54: // 开始工作

55: fmt.Printf("Worker%d start %s\n", worker_id, task)

56:

57: // 随机等待一段时间

58: sleep := rand.Int63n(100)

59: time.Sleep(time.Duration(sleep)*time.Millisecond)

60: // 显示完成了工作

61: fmt.Printf("Worker%d Completed %s\n", worker_id, task)

62: }

63:

输出结果:

1: Worker4 start Task:1

2: Worker1 start Task:2

3: Worker2 start Task:3

4: Worker3 start Task:4

5: Worker3 Completed Task:4

6: Worker3 start Task:5

7: Worker4 Completed Task:1

8: Worker4 start Task:6

9: Worker2 Completed Task:3

10: Worker2 start Task:7

11: Worker3 Completed Task:5

12: Worker3 start Task:8

13: Worker Completed Task:7

14: Worker2 start Task:9

15: Worker3 Completed Task:8

16: Worker3 shut down

17: Worker4 Completed Task:6

18: Worker4 shut down

19: Worker1 Completed Task:2

20: Worker1 shut down

21: Worker2 Completed Task:9

22: Worker2 shut down

23: all inished!

由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。

当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Go语言指针详解,看这一篇文章就够了(go 方法 指针)
下一篇:从Go汇编角度解释for循环的两个疑点(go循环引用解决)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~