在产业去中心化大潮下,体育营销如何变得更有效!(体育营销的宏观环境)
296
2022-07-19
Go语言之并发
Go语言直接支持内置支持并发。当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。
Go语言运行时的调度器是一个复杂的软件,这个调度器在操作系统之上。操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine。
Go语言的并发同步逻辑来自一个叫做通信顺讯进程(CSP)的范型。CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而不是通过对数据进行加锁来实现同步访问。这种数据的类型叫做通道(channel) 。
并发与并行
在操作系统中,一个应用程序就可以看作一个进程,而每个进程至少包含一个线程。每个进程的初始线程被称为主线程。
操作系统会在物理处理器(CPU)上调度线程来运行,而Go语言会在逻辑处理器来调度goroutine来运行。1.5版本之上,Go语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器。1.5之前,默认给整个应用程序只分配一个逻辑处理器。
如下图,在运行时把goroutine调度到逻辑处理器上运行,逻辑处理器绑定到唯一的操作系统线程。
当goroutine执行了一个阻塞的系统调用(就是一个非纯CPU的任务)时,调度器会将这个线程与处理器分离,并创建一个新线程来运行这个处理器上提供的服务。
语言运行默认限制每个程序最多创建10000个线程。
注意并发≠并行!并行需要至少2个逻辑处理器。
goroutine
以并发的形式分别显示大写和小写的英文字母
package main
import (
"fmt"
"runtime"
"sync"
func main() {
// 分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)
// wg用来等待程序完成
var wg sync.WaitGroup
// 计数器加2,表示要等待两个goroutine
wg.Add(2)
fmt.Println("Start!")
// 声明一个匿名函数,并创建一个goroutime
go func() {
// 通知main函数工作已经完成
defer wg.Done()
21: // 显示字母表3次
22: for count:=0; count<3;count++ {
23: for char:='a';char<'a'+26;char++ {
24: fmt.Printf("%c ", char)
25: }
26: }
27: }()
28: // 同上
29: go func() {
30: // 通知main函数工作已经完成
31: defer wg.Done()
32: // 显示字母表3次
33: for count:=0; count<3;count++ {
34: for char:='A';char<'A'+26;char++ {
35: fmt.Printf("%c ", char)
36: }
37: }
38: }()
39: // 等待goroutine结束
40: fmt.Println("Waiting!")
41: wg.Wait()
42: fmt.Println("\nFinish!")
43: }
运行结果后,可以看到先输出的是所有的大写字母,最后才是小写字母。是因为第一个goroutine完成所有显示需要花时间太短了,以至于在调度器切换到第二个goroutine之前,就完成了所有任务。
调度器为了防止某个goroutine长时间占用逻辑处理器,会停止当前正运行的goroutine,运行其他可运行的goroutine运行的机会。
创建两个相同的长时间才能完成其工作的goroutine就可以看到,比如说显示5000以内的素数值。
代码结构如下
1: go printPrime("A")
2: go printPrime("B")
3:
4: func printPrime(prefix string) {
5: ...
6: }
结果类似
1: B:2
2: B:3
3: ...
4: B:4591
5: A:3
6: A:5
7: ...
8: A:4561
9: A:4567
10: B:4603
11: B:4621
12: ...
13: // Completed B
14: A:4457
15: ...
16: // Completed A
如何修改逻辑处理器的数量
1: runtime.GOMAXPROCS(runtime.NUMCPU())
稍微改动下上面的代码,结果就会大不同
1: package main
2:
3: import (
4: "fmt"
5: "runtime"
6: "sync"
7:
8:
9: func main() {
10: // 分配两个逻辑处理器给调度器使用
11: runtime.GOMAXPROCS(2)
12: // wg用来等待程序完成
13: var wg sync.WaitGroup
14: // 计数器加2,表示要等待两个goroutine
15: wg.Add(2)
16: fmt.Println("Start!")
17: // 声明一个匿名函数,并创建一个goroutime
18: go func() {
19: // 通知main函数工作已经完成
20: defer wg.Done()
21: // 显示字母表3次
22: for count:=0; count<10;count++ {
23: for char:='a';char<'a'+26;char++ {
24: fmt.Printf("%c ", char)
25: }
26: }
27: }()
28: // 同上
29: go func() {
30: // 通知main函数工作已经完成
31: defer wg.Done()
32: // 显示字母表3次
33: for count:=0; count<10;count++ {
34: for char:='A';char<'A'+26;char++ {
35: fmt.Printf("%c ", char)
36: }
37: }
38: }()
39: // 等待goroutine结束
40: fmt.Println("Waiting!")
41: wg.Wait()
42: fmt.Println("\nFinish!")
43: }
结果类似下面的(根据CPU单核的性能结果可能结果稍微不一样)
1: Start!
2: Waiting!
3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g
4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s
6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X
7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q
8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
9: Finish!
可以发现,goroutine是并行运行的。
只有在有多个逻辑处理器且可以同时让每个goroutine运行在一个可用的物理处理器上的时候,goroutine才会并行运行。
竞争状态
如果两个或者多个goroutine在没有互相同步的情况下,访问某个共享的资源,并且试图同时读和写这个资源,就处于相互竞争的状态。
在竞争状态,每个goroutine都会覆盖另一个goroutine的工作。这种覆盖发生在goroutine发生切换的时候。
每个goroutien都会创造自己的共享变量副本。当切换到领另一个goroutine时,如果这个变量的值在上一个goroutine发生改变,这个goroutine再次运行时,虽然变量的值改变了,但是由于这个goroutine没有更新自己的那个副本的值,而是继续使用,并且将其存回变量的值,从而覆盖上一个goroutine 的工作。
go build –race用来竞争检测器标志来编译程序
锁住共享资源
原子函数
原子函数能够以底层的枷锁机制来同步访问整型变量和指针。省略部分代码如下:
1: var counter int64
2: go incCounter(1)
3: go incCounter(2)
4: func incCounter(id int) {
5: for count:=0;count<2;count++{
6: //安全地对counter加1
7: atomic.AddInt64(&counter, 1)
8: //当前goroutine从线程退出,并放回队列
9: runtime.Gosched()
10: }
11: }
使用atmoi包的AddInt64函数。这些goroutine都会自动根据所引用的变量做同步处理。
另外两个原子函数是LoadInt64和StoreInt64。用法如下:
1: // shutdown是通知正在执行的goroutine停止工作的标志
2: var shutdown int64
3: var wg sync.WaitGroup
4: // 该停止工作了,安全地设置shutdown标志
5: atomic.StoreInt64(&shutdown, 1)
6: // 等待goroutine结束
7: wg.Wait()
8: // 检测是否停止工作,如果shutdown==1那么goroutine就会终止
9: if atomic.LoadInt64(&shutdown) == 1 {
10: break
11: }
12:
互斥锁
另一种同步访问共享资源的方式是互斥锁。主要代码如下:
1: var (
2: // counter是所有goroutine都要增加其值的变量
3: counter int
4: wg sync.WaitGroup
5: // mutex用来定义一段代码临界区
6: mutex sync.Mutex
7: )
8: func main...
9: // 业务代码
10: func incCounter(id int) {
11: defer wg.Done()
12: for i:=0;i<2;i++ {
13: //同一时期只允许一个goroutine进入
14: mutex.Lock()
15: //大括号并不是必须的
16: {
17: //捕获counter的值
18: value := counter
19: //当前goroutine从线程退出,并返回到队列
20: runtime.Gosched()
21: //增加本地value变量的值
22: value++
23: //将该值保存回counter
24: counter = value
25: }
26: // 释放锁,允许其他正在等待的goroutine
27: mutex.Unlock()
28: }
29: }
通道
通道在goroutine之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。
可以通过通道共享内置类型,命名类型,结构类型和引用类型的值或者指针。
go语言需要使用make来创建一个通道,chan是关键字:
1: // 无缓冲的整型通道
2: unbuffered := make(chan int)
3: // 有缓冲的字符串通道
4: buffered := make(chan string, 10)
向通道发送值
1: buffered := make(chan string, 10)
2: // 通通道发送一个字符串
3: buffered <- "Gopher"
4: // 从通道接收一个字符串
5: value := <-buffered
无缓冲的通道是指在接收前没有能力保存任何值的通道。发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作。如果没有准备好,通道会导致goroutine阻塞等待。所以无缓冲通道保证了goroutine之间同一时间进行数据交换。
1: // 四个goroutine间的接力比赛
2: package main
3:
4: import (
5: "fmt"
6: "sync"
7: "time"
8: )
9
10: var wg sync.WaitGroup
11:
12: func main() {
13: //创建一个无缓冲的通道
14: baton := make(chan int)
15: wg.Add(1)
16: // 第一步跑步者持有接力棒
17: go Runner(baton)
18: // 开始比赛
19: baon <- 1
20: // 等待比赛结束
21: wg.Wait()
22: }
23:
24: // Ruuner模拟接力比赛中的一位跑步者
25: func Runner(baton chan int) {
26: var newRunner int
27: // 等待接力棒
28: runner := <-baton
29: // 开始跑步
30: fmt.Printf("运动员%d带着Baton跑\n", runner)
31: // 创建下一步跑步者
32: if runner != 4{
33: newRunner = runner + 1
34: fmt.Printf("运动员%d上线\n", newRunner)
35: go Runner(baton)
36: }
37: // 围绕跑到跑
38: time.Sleep(100 * time.Millisecond)
39: // 比赛结束了吗?
40: if runner == 4{
41: fmt.Printf("运动员%d完成,比赛结束\n", runner)
42: wg.Done()
43: return
44: }
45: // 将接力棒交给下一位跑步者
46: fmt.Printf("运动员%d与运动员%d交换\n", runner, newRunner)
47: baton <- newRunner
48: }
结果:
1: 运动员1带着Baton跑
2: 运动员2上线
3: 运动员1与运动员2交换
4: 运动员2带着Baton跑
5: 运动员3上线
6: 运动员2与运动员3交换
7: 运动员3带着Baton跑
8: 运动员4上线
9: 运动员3与运动员4交换
10: 运动员4带着Baton跑
11: 运动员4完成,比赛结束
有缓冲的通道则能在接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求goroutine之间必须同时完成发送和接收。只有在通道没有可用缓冲区或者没有要接收的值时,发送或者接收才会阻塞。
1: package main
2:
3: import (
4: "fmt"
5: "math/rand"
6: "sync"
7: "time"
8: )
9:
10: const (
11: // goroutine的数量
12: numberGoroutines = 4
13: // 工作的数量
14: taskLoad = 10
15: )
16:
17: var wg sync.WaitGroup
18:
19: // 初始化随机数种子
20: func init() {
21: rand.Seed(time.Now().Unix())
22: }
23: func main() {
24: // 创建一个有缓冲的通道来管理工作
25: tasks := make(chan string, taskLoad)
26: wg.Add(numberGoroutines)
27: // 增加一组要完成的工作
28: for post:=1;post 29: tasks <- fmt.Sprintf("Task:%d", post) 30: } 31: // 启动goroutine来处理工作 32: for i:=1;i 33: go worker(tasks, i) 34: } 35: // 有工作处理完时关闭通道 36: closetasks) 37: 38: wg.Wait() 39: fmt.Printf("all finished!") 40: 41: } 42: 43: func worker(tasks chan string, worker_id int) { 44: defer wg.Done() 45: 46: for { 47: //等待分配工作 48: task, ok := <-tasks 49: if !ok { 50: //通道变空 51: fmt.Printf("Worker%d shut down\n", worker_id) 52: return 53: } 54: // 开始工作 55: fmt.Printf("Worker%d start %s\n", worker_id, task) 56: 57: // 随机等待一段时间 58: sleep := rand.Int63n(100) 59: time.Sleep(time.Duration(sleep)*time.Millisecond) 60: // 显示完成了工作 61: fmt.Printf("Worker%d Completed %s\n", worker_id, task) 62: } 63: 输出结果: 1: Worker4 start Task:1 2: Worker1 start Task:2 3: Worker2 start Task:3 4: Worker3 start Task:4 5: Worker3 Completed Task:4 6: Worker3 start Task:5 7: Worker4 Completed Task:1 8: Worker4 start Task:6 9: Worker2 Completed Task:3 10: Worker2 start Task:7 11: Worker3 Completed Task:5 12: Worker3 start Task:8 13: Worker Completed Task:7 14: Worker2 start Task:9 15: Worker3 Completed Task:8 16: Worker3 shut down 17: Worker4 Completed Task:6 18: Worker4 shut down 19: Worker1 Completed Task:2 20: Worker1 shut down 21: Worker2 Completed Task:9 22: Worker2 shut down 23: all inished! 由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。 当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。
29: tasks <- fmt.Sprintf("Task:%d", post)
30: }
31: // 启动goroutine来处理工作
32: for i:=1;i 33: go worker(tasks, i) 34: } 35: // 有工作处理完时关闭通道 36: closetasks) 37: 38: wg.Wait() 39: fmt.Printf("all finished!") 40: 41: } 42: 43: func worker(tasks chan string, worker_id int) { 44: defer wg.Done() 45: 46: for { 47: //等待分配工作 48: task, ok := <-tasks 49: if !ok { 50: //通道变空 51: fmt.Printf("Worker%d shut down\n", worker_id) 52: return 53: } 54: // 开始工作 55: fmt.Printf("Worker%d start %s\n", worker_id, task) 56: 57: // 随机等待一段时间 58: sleep := rand.Int63n(100) 59: time.Sleep(time.Duration(sleep)*time.Millisecond) 60: // 显示完成了工作 61: fmt.Printf("Worker%d Completed %s\n", worker_id, task) 62: } 63: 输出结果: 1: Worker4 start Task:1 2: Worker1 start Task:2 3: Worker2 start Task:3 4: Worker3 start Task:4 5: Worker3 Completed Task:4 6: Worker3 start Task:5 7: Worker4 Completed Task:1 8: Worker4 start Task:6 9: Worker2 Completed Task:3 10: Worker2 start Task:7 11: Worker3 Completed Task:5 12: Worker3 start Task:8 13: Worker Completed Task:7 14: Worker2 start Task:9 15: Worker3 Completed Task:8 16: Worker3 shut down 17: Worker4 Completed Task:6 18: Worker4 shut down 19: Worker1 Completed Task:2 20: Worker1 shut down 21: Worker2 Completed Task:9 22: Worker2 shut down 23: all inished! 由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。 当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。
33: go worker(tasks, i)
34: }
35: // 有工作处理完时关闭通道
36: closetasks)
37:
38: wg.Wait()
39: fmt.Printf("all finished!")
40:
41: }
42:
43: func worker(tasks chan string, worker_id int) {
44: defer wg.Done()
45:
46: for {
47: //等待分配工作
48: task, ok := <-tasks
49: if !ok {
50: //通道变空
51: fmt.Printf("Worker%d shut down\n", worker_id)
52: return
53: }
54: // 开始工作
55: fmt.Printf("Worker%d start %s\n", worker_id, task)
56:
57: // 随机等待一段时间
58: sleep := rand.Int63n(100)
59: time.Sleep(time.Duration(sleep)*time.Millisecond)
60: // 显示完成了工作
61: fmt.Printf("Worker%d Completed %s\n", worker_id, task)
62: }
63:
输出结果:
1: Worker4 start Task:1
2: Worker1 start Task:2
3: Worker2 start Task:3
4: Worker3 start Task:4
5: Worker3 Completed Task:4
6: Worker3 start Task:5
7: Worker4 Completed Task:1
8: Worker4 start Task:6
9: Worker2 Completed Task:3
10: Worker2 start Task:7
11: Worker3 Completed Task:5
12: Worker3 start Task:8
13: Worker Completed Task:7
14: Worker2 start Task:9
15: Worker3 Completed Task:8
16: Worker3 shut down
17: Worker4 Completed Task:6
18: Worker4 shut down
19: Worker1 Completed Task:2
20: Worker1 shut down
21: Worker2 Completed Task:9
22: Worker2 shut down
23: all inished!
由于程序和Go语言的调度器有随机的成分,结果每次都会不一样。不过总流程不会大变。
当通道关闭后,goroutine依旧从通道里的缓冲区获取数据,但是不能再向通道里发送数据。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,兵返回一个通道类型的零值。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~