Golang 并发控制模型

发布于:2024-08-03 ⋅ 阅读:(53) ⋅ 点赞:(0)

Golang 并发控制模型

  Go语言的并发模型是CSP(通信顺序进程),提倡通过通信来进行内存共享,而不是通过共享内存来实现通信。

  控制并发有三种经典的方式,使用 channel 通知实现并发控制、使用 sync 包中的 WaitGroup 实现并发控制、使用 Context 上下文实现并发控制。

一、使用 channel 通知实现并发控制

1、无缓冲通道

  无缓冲通道,又叫做阻塞通道。发送方 (goroutine) 和接收方 (gouroutine) 必须是同步的,同时准备好,如果没有同时准备好的话,一方就会一直阻塞住,直到另一方准备好为止。

  使用无缓冲通道进行通信,将发送和接收的 goroutine 同步化,因此,无缓冲通道也被称为同步通道。

ch := make(chan int) // 创建无缓冲通道

在这里插入图片描述

使用无缓冲通道实现并发控制:

package main

import "fmt"

func recv(c chan int) {
	fmt.Println("开始接收")
	ret := <-c
	fmt.Println("接收成功", ret)
}

func main() {
	ch := make(chan int)
	go recv(ch) // 启用 goroutine 从通道接收值
	ch <- 10
	fmt.Println("发送成功")
}


在这里插入图片描述

  当子协程从无缓冲 channel 里接收值时,没有发送方,子协程阻塞等待,直到主协程往无缓冲 channel 里发送值,子协程开始执行,然后主协程开始执行。

2、有缓冲通道

  只要通道的容量大于0,那么该通道就是有缓冲通道,通道的容量表示通道中能最多存放元素的数量。发送方在缓冲区满的时候阻塞,接收方不阻塞;接收方在缓冲区为空的时候阻塞,发送方不阻塞。

ch := make(chan int, 10) // 创建一个缓冲区为10的有缓冲通道
fmt.Println(len(ch)) // 通过len函数获取当前通道内元素数量
fmt.Println(cap(ch)) // 通过cap函数获取通道的容量

在这里插入图片描述

使用缓冲区为 1 的通道实现并发控制:

package main

import (
	"fmt"
	"time"
)

func recv(c chan int) {
	fmt.Println("开始接收")
	ret := <-c
	fmt.Println("接收成功", ret)
}

func main() {
	ch := make(chan int, 1)
	ch <- 10
	go recv(ch) // 启用 goroutine 从通道接收值
	time.Sleep(time.Second)
	fmt.Println("发送成功")
}


在这里插入图片描述

  当主协程往缓冲区为1的 channel 里发送值时,不阻塞,子协程启动,从无缓冲 channel 里接收值,主协程睡眠1秒,等待子协程执行完,主协程在执行。

二、使用 sync 包中的 WaitGroup 实现并发控制

1、sync.WaitGroup

  在 sync 包中提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成。

  在主协程中调用 Add() 添加需要执行 goroutine 的数量,在每一个 goroutine 执行完成后调用 Done() ,表示这个 goroutine 已经完成,主协程调用 Wait() 阻塞等待所有 goroutine 执行完成,当所有的 goroutine 都执行完成后,主协程返回。

  实现原理:sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少,当我们启动 N 个并发任务时,将计数器增加 N,每个任务通过调用Done方法将计数器减1,通过调用Wait()来等待并发任务执行完,当计数器的值为 0 时,表示所有并发任务都已经完成。

sync.WaitGroup有以下三种方法:

  • Add(N int) : 计数器 + N
  • Done() : 计数器 - 1
  • Wait() : 阻塞,直到计数器变为0
package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(3)
	go hello() // 启动3个goroutine去执行hello函数
	go hello()
	go hello()
	fmt.Println("main goroutine done!")
	wg.Wait()
}

在这里插入图片描述

扩展:

在Golang官网中,有这么一句话:

A WaitGroup must not be copied after first use.

意思是,在 WaitGroup 第一次使用后,不能被拷贝。

为什么呢???

通过下面的例子我们浅浅分析一下。

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(wg sync.WaitGroup, i int) {
			fmt.Println(i)
			wg.Done()
		}(wg, i)
	}
	wg.Wait()
}

在这里插入图片描述

  提示所有的 goroutine 都已经睡眠了,出现了死锁。这是因为 wg 值拷贝传递到了子 goroutine 中,导致只有 Add 操作,Done 操作是在 wg 的副本执行的, wg 的作用域为子协程,而不是全局,因此主协程就死锁了。

改正方法:

  • 指针,将匿名函数中 wg 的传入类型改为 *sync.WaitGroup 。
func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(wg *sync.WaitGroup, i int) {
			fmt.Println(i)
			wg.Done()
		}(&wg, i)
	}
	wg.Wait()
}
  • 闭包,将匿名函数中的 wg 的传入参数去掉,在匿名函数中可以直接使用外面的 wg 变量。
func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			fmt.Println(i)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

2、sync.Once

  很多场景下,我们需要确保某些操作在高并发时只执行一次,例如只加载一次配置文件。Go语言中的sync包提供了一个针对只执行一次场景的解决方案 sync.Once。

  sync.Once只有一个Do方法,Do(f func()) 。

  实现原理:sync.Once 内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成,这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会执行多次。

  样例如下:延迟一个开销很大的初始化操作到真正用到它的时候再执行。

package main

import (
	"image"
	"sync"
)

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	// 加载图片
	icons = map[string]image.Image{}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

三、使用 Context 上下文实现并发控制

1、简介

  在一些简单场景下使用 channel 和 WaitGroup 已经足够了,但是当面临一些复杂多变的网络并发场景下 channel 和 WaitGroup 显得有些力不从心了。在并发程序中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作。

  举个例子:在 Go http包的Server中,每一个请求在都有一个对应的 goroutine 去处理。请求处理函数通常会启动额外的 goroutine 用来访问后端服务,比如数据库和RPC服务,用来处理一个请求的 goroutine 通常需要访问一些与请求特定的数据,比如终端用户的身份认证信息、验证相关的token、请求的截止时间。 当一个请求被取消或超时时,所有用来处理该请求的 goroutine 都应该迅速中断退出,然后系统才能释放这些 goroutine 占用的资源。

  所以我们需要一种可以跟踪 goroutine 的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的 Context,称之为上下文非常贴切,它就是 goroutine 的上下文。它包括一个程序的运行环境、现场和快照等。每个程序要运行时,都需要知道当前程序的运行状态,通常Go 将这些封装在一个 Context 里,再将它传给要执行的 goroutine 。context 包主要是用来处理多个 goroutine 之间共享数据,及多个 goroutine 的管理。

  context常用的使用场景:

  • 一个请求对应多个goroutine之间的数据交互
  • 超时控制
  • 上下文控制

2、context 包

context 包的核心是 struct Context,接口声明如下:

type Context interface {
	// 返回Context的超时时间(超时返回场景)
	Deadline() (deadline time.Time, ok bool)
	// 在Context超时或取消时(即结束了)返回一个关闭的channel,取消信号
    // 如果当前Context超时或取消时,Done方法会返回一个channel,然后其他地方就可以通过判断Done方法是否有返回(channel),如果有则说明Context已结束
    // 故其可以作为广播通知其他相关方本Context已结束,请做相关处理。
	Done() <-chan struct{}
	// 返回Context取消的原因
	Err() error
	// 返回Context相关数据
	Value(key any) any
}

  Context 对象是线程安全的(底层数据结构加了互斥锁),你可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行取消操作时,所有 goroutine 都会接收到取消信号。

  一个 Context 不能拥有 Cancel 方法,同时我们也只能 Done channel 接收数据。原因是:接收取消信号的函数和发送信号的函数通常不是一个。一个典型的场景是:父操作为子操作操作启动 goroutine,子操作也就不能取消父操作。

3、继承 context

  context 包提供了一些函数,协助用户从现有的 Context 对象创建新的 Context 对象。这些 Context 对象形成一棵树:当一个 Context 对象被取消时,继承自它的所有 Context 都会被取消。

  Background 是所有 Context 对象树的根,它不能被取消。context 包提供了三种context,分别是普通context、超时context、带值的context:

func Background() Context {
	return backgroundCtx{}
}

// 普通context,通常这样调用: 
ctx, cancel := context.WithCancel(context.Background())
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// 带超时的context,超时之后会自动close对象的Done,与调用CancelFunc的效果一样
// WithDeadline 明确地设置一个d指定的系统时钟时间,如果超过就触发超时
// WithTimeout 设置一个相对的超时时间,也就是deadline设为timeout加上当前的系统时间
// 因为两者事实上都依赖于系统时钟,所以可能存在微小的误差,所以官方不推荐把超时间隔设置得太小
// 通常这样调用:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

// 带有值的context,没有CancelFunc,所以它只用于值的多goroutine传递和共享
// 通常这样调用:
ctx := context.WithValue(context.Background(), "key", myValue)
func WithValue(parent Context, key, val interface{}) Context

  WithCancel 和 WithTimeout 函数会返回继承的 Context 对象, 这些对象可以比它们的父 Context 更早地取消。当请求处理函数返回时,与该请求关联的 Context 会被取消。当使用多个副本发送请求时,可以使用 WithCancel 取消多余的请求。 WithTimeout 在设置对后端服务器请求超时时间时非常有用。WithValue 函数能够将请求作用域的数据与 Context 对象建立关系。

4、context 例子

  下面的例子,主要描述的是通过一个 channel 实现一个为循环次数为5的循环。

package main

import (
	"context"
	"fmt"
	"time"
)

func childFunc(cont context.Context, num *int) {
	ctx, _ := context.WithCancel(cont)
	for {
		select {
		case <-ctx.Done():
			fmt.Println("child Done : ", ctx.Err())
			return
		}
	}
}

func main() {
	gen := func(ctx context.Context) <-chan int {
		dst := make(chan int)
		n := 1
		go func() {
			for {
				select {
				case <-ctx.Done():
					fmt.Println("parent Done : ", ctx.Err())
					return // returning not to leak the goroutine
				case dst <- n:
					n++
					go childFunc(ctx, &n)
				}
			}
		}()
		return dst
	}

	ctx, cancel := context.WithCancel(context.Background())
	for n := range gen(ctx) {
		fmt.Println(n)
		if n >= 5 {
			break
		}
	}
	cancel()
	time.Sleep(5 * time.Second)
}

在这里插入图片描述

  在每一个循环中产生一个goroutine,每一个goroutine中都传入context,在每个goroutine中通过传入 ctx 创建一个子Context,并且通过 select 一直监控该Context的运行情况,当父 Context 退出的时候,代码中并没有明显调用子 Context 的 Cancel 函数,但是分析结果,子 Context 还是被正确合理的关闭了,这是因为,所有基于这个 Context 或者衍生的子 Context 都会收到通知,这时就可以进行清理操作了,最终释放 goroutine,这就优雅的解决了 goroutine 启动后不可控的问题。

5、context 使用原则

  • 不要把 context 放在结构体中,要以参数的方式传递
  • 以 context 作为参数的函数方法,应该把 context 作为第一个参数,放在第一位。
  • 给一个函数方法传递 context 的时候,不要传递nil,如果不知道传递什么,就使用context.TODO。
  • context 的 Value 相关方法应该传递必须的数据,不要什么数据都使用这个传递。
  • context 是线程安全的,底层数据结构加了互斥锁,可以放心的在多个goroutine中传递。