文章目录
Golang 并发控制模型
Go语言的并发模型是CSP(通信顺序进程),提倡通过通信来进行内存共享,而不是通过共享内存来实现通信。
控制并发有三种经典的方式,使用 channel 通知实现并发控制、使用 sync 包中的 WaitGroup 实现并发控制、使用 Context 上下文实现并发控制。
一、使用 channel 通知实现并发控制
1、无缓冲通道
无缓冲通道,又叫做阻塞通道。发送方 (goroutine) 和接收方 (gouroutine) 必须是同步的,同时准备好,如果没有同时准备好的话,一方就会一直阻塞住,直到另一方准备好为止。
使用无缓冲通道进行通信,将发送和接收的 goroutine 同步化,因此,无缓冲通道也被称为同步通道。
ch := make(chan int) // 创建无缓冲通道
使用无缓冲通道实现并发控制:
package main
import "fmt"
func recv(c chan int) {
fmt.Println("开始接收")
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 启用 goroutine 从通道接收值
ch <- 10
fmt.Println("发送成功")
}
当子协程从无缓冲 channel 里接收值时,没有发送方,子协程阻塞等待,直到主协程往无缓冲 channel 里发送值,子协程开始执行,然后主协程开始执行。
2、有缓冲通道
只要通道的容量大于0,那么该通道就是有缓冲通道,通道的容量表示通道中能最多存放元素的数量。发送方在缓冲区满的时候阻塞,接收方不阻塞;接收方在缓冲区为空的时候阻塞,发送方不阻塞。
ch := make(chan int, 10) // 创建一个缓冲区为10的有缓冲通道
fmt.Println(len(ch)) // 通过len函数获取当前通道内元素数量
fmt.Println(cap(ch)) // 通过cap函数获取通道的容量
使用缓冲区为 1 的通道实现并发控制:
package main
import (
"fmt"
"time"
)
func recv(c chan int) {
fmt.Println("开始接收")
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int, 1)
ch <- 10
go recv(ch) // 启用 goroutine 从通道接收值
time.Sleep(time.Second)
fmt.Println("发送成功")
}
当主协程往缓冲区为1的 channel 里发送值时,不阻塞,子协程启动,从无缓冲 channel 里接收值,主协程睡眠1秒,等待子协程执行完,主协程在执行。
二、使用 sync 包中的 WaitGroup 实现并发控制
1、sync.WaitGroup
在 sync 包中提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成。
在主协程中调用 Add() 添加需要执行 goroutine 的数量,在每一个 goroutine 执行完成后调用 Done() ,表示这个 goroutine 已经完成,主协程调用 Wait() 阻塞等待所有 goroutine 执行完成,当所有的 goroutine 都执行完成后,主协程返回。
实现原理:sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少,当我们启动 N 个并发任务时,将计数器增加 N,每个任务通过调用Done方法将计数器减1,通过调用Wait()来等待并发任务执行完,当计数器的值为 0 时,表示所有并发任务都已经完成。
sync.WaitGroup有以下三种方法:
- Add(N int) : 计数器 + N
- Done() : 计数器 - 1
- Wait() : 阻塞,直到计数器变为0
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(3)
go hello() // 启动3个goroutine去执行hello函数
go hello()
go hello()
fmt.Println("main goroutine done!")
wg.Wait()
}
扩展:
在Golang官网中,有这么一句话:
A WaitGroup must not be copied after first use.
意思是,在 WaitGroup 第一次使用后,不能被拷贝。
为什么呢???
通过下面的例子我们浅浅分析一下。
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg sync.WaitGroup, i int) {
fmt.Println(i)
wg.Done()
}(wg, i)
}
wg.Wait()
}
提示所有的 goroutine 都已经睡眠了,出现了死锁。这是因为 wg 值拷贝传递到了子 goroutine 中,导致只有 Add 操作,Done 操作是在 wg 的副本执行的, wg 的作用域为子协程,而不是全局,因此主协程就死锁了。
改正方法:
- 指针,将匿名函数中 wg 的传入类型改为 *sync.WaitGroup 。
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup, i int) {
fmt.Println(i)
wg.Done()
}(&wg, i)
}
wg.Wait()
}
- 闭包,将匿名函数中的 wg 的传入参数去掉,在匿名函数中可以直接使用外面的 wg 变量。
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
fmt.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}
2、sync.Once
很多场景下,我们需要确保某些操作在高并发时只执行一次,例如只加载一次配置文件。Go语言中的sync包提供了一个针对只执行一次场景的解决方案 sync.Once。
sync.Once只有一个Do方法,Do(f func()) 。
实现原理:sync.Once 内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成,这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会执行多次。
样例如下:延迟一个开销很大的初始化操作到真正用到它的时候再执行。
package main
import (
"image"
"sync"
)
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
// 加载图片
icons = map[string]image.Image{}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
三、使用 Context 上下文实现并发控制
1、简介
在一些简单场景下使用 channel 和 WaitGroup 已经足够了,但是当面临一些复杂多变的网络并发场景下 channel 和 WaitGroup 显得有些力不从心了。在并发程序中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作。
举个例子:在 Go http包的Server中,每一个请求在都有一个对应的 goroutine 去处理。请求处理函数通常会启动额外的 goroutine 用来访问后端服务,比如数据库和RPC服务,用来处理一个请求的 goroutine 通常需要访问一些与请求特定的数据,比如终端用户的身份认证信息、验证相关的token、请求的截止时间。 当一个请求被取消或超时时,所有用来处理该请求的 goroutine 都应该迅速中断退出,然后系统才能释放这些 goroutine 占用的资源。
所以我们需要一种可以跟踪 goroutine 的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的 Context,称之为上下文非常贴切,它就是 goroutine 的上下文。它包括一个程序的运行环境、现场和快照等。每个程序要运行时,都需要知道当前程序的运行状态,通常Go 将这些封装在一个 Context 里,再将它传给要执行的 goroutine 。context 包主要是用来处理多个 goroutine 之间共享数据,及多个 goroutine 的管理。
context常用的使用场景:
- 一个请求对应多个goroutine之间的数据交互
- 超时控制
- 上下文控制
2、context 包
context 包的核心是 struct Context,接口声明如下:
type Context interface {
// 返回Context的超时时间(超时返回场景)
Deadline() (deadline time.Time, ok bool)
// 在Context超时或取消时(即结束了)返回一个关闭的channel,取消信号
// 如果当前Context超时或取消时,Done方法会返回一个channel,然后其他地方就可以通过判断Done方法是否有返回(channel),如果有则说明Context已结束
// 故其可以作为广播通知其他相关方本Context已结束,请做相关处理。
Done() <-chan struct{}
// 返回Context取消的原因
Err() error
// 返回Context相关数据
Value(key any) any
}
Context 对象是线程安全的(底层数据结构加了互斥锁),你可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行取消操作时,所有 goroutine 都会接收到取消信号。
一个 Context 不能拥有 Cancel 方法,同时我们也只能 Done channel 接收数据。原因是:接收取消信号的函数和发送信号的函数通常不是一个。一个典型的场景是:父操作为子操作操作启动 goroutine,子操作也就不能取消父操作。
3、继承 context
context 包提供了一些函数,协助用户从现有的 Context 对象创建新的 Context 对象。这些 Context 对象形成一棵树:当一个 Context 对象被取消时,继承自它的所有 Context 都会被取消。
Background 是所有 Context 对象树的根,它不能被取消。context 包提供了三种context,分别是普通context、超时context、带值的context:
func Background() Context {
return backgroundCtx{}
}
// 普通context,通常这样调用:
ctx, cancel := context.WithCancel(context.Background())
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// 带超时的context,超时之后会自动close对象的Done,与调用CancelFunc的效果一样
// WithDeadline 明确地设置一个d指定的系统时钟时间,如果超过就触发超时
// WithTimeout 设置一个相对的超时时间,也就是deadline设为timeout加上当前的系统时间
// 因为两者事实上都依赖于系统时钟,所以可能存在微小的误差,所以官方不推荐把超时间隔设置得太小
// 通常这样调用:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
// 带有值的context,没有CancelFunc,所以它只用于值的多goroutine传递和共享
// 通常这样调用:
ctx := context.WithValue(context.Background(), "key", myValue)
func WithValue(parent Context, key, val interface{}) Context
WithCancel 和 WithTimeout 函数会返回继承的 Context 对象, 这些对象可以比它们的父 Context 更早地取消。当请求处理函数返回时,与该请求关联的 Context 会被取消。当使用多个副本发送请求时,可以使用 WithCancel 取消多余的请求。 WithTimeout 在设置对后端服务器请求超时时间时非常有用。WithValue 函数能够将请求作用域的数据与 Context 对象建立关系。
4、context 例子
下面的例子,主要描述的是通过一个 channel 实现一个为循环次数为5的循环。
package main
import (
"context"
"fmt"
"time"
)
func childFunc(cont context.Context, num *int) {
ctx, _ := context.WithCancel(cont)
for {
select {
case <-ctx.Done():
fmt.Println("child Done : ", ctx.Err())
return
}
}
}
func main() {
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("parent Done : ", ctx.Err())
return // returning not to leak the goroutine
case dst <- n:
n++
go childFunc(ctx, &n)
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
for n := range gen(ctx) {
fmt.Println(n)
if n >= 5 {
break
}
}
cancel()
time.Sleep(5 * time.Second)
}
在每一个循环中产生一个goroutine,每一个goroutine中都传入context,在每个goroutine中通过传入 ctx 创建一个子Context,并且通过 select 一直监控该Context的运行情况,当父 Context 退出的时候,代码中并没有明显调用子 Context 的 Cancel 函数,但是分析结果,子 Context 还是被正确合理的关闭了,这是因为,所有基于这个 Context 或者衍生的子 Context 都会收到通知,这时就可以进行清理操作了,最终释放 goroutine,这就优雅的解决了 goroutine 启动后不可控的问题。
5、context 使用原则
- 不要把 context 放在结构体中,要以参数的方式传递。
- 以 context 作为参数的函数方法,应该把 context 作为第一个参数,放在第一位。
- 给一个函数方法传递 context 的时候,不要传递nil,如果不知道传递什么,就使用context.TODO。
- context 的 Value 相关方法应该传递必须的数据,不要什么数据都使用这个传递。
- context 是线程安全的,底层数据结构加了互斥锁,可以放心的在多个goroutine中传递。