Go基础学习09-多协程资源竞争、sync.Mutex、sync.Cond、chan在多协程对共享变量的资源竞争中的使用

发布于:2024-10-09 ⋅ 阅读:(37) ⋅ 点赞:(0)

Go中协程基础小记

协程基础

在Go中,线程被程为goroutine,也有人称其为线程的线程(a thread of thread)。每个线程都有自己的单独的变量如:

PC程序计数器、函数调用栈Stack、一套寄存器Registers。

在Go中协程对应的primitive原语有:

  • start/go:启动/运行一个线程
  • exit:线程退出,一般从某个函数退出/结束执行后,会自动隐式退出
  • stop:停止一个线程,比如向一个没有读者的channel写数据,那么channel阻塞,go可能会运行时暂时停止这个线程
  • resume:恢复原本停止的线程重新执行,需要恢复程序计数器(program counter)、栈指针(stack pointer)、寄存器(register)状态,让处理器继续运行该线程。

为什么需要多协程

依靠多协程达到并发的效果:

  • I/O concurrency:I/O并发。
  • multi-core parallelism:多核并行,提高整体吞吐量。充分利用CPU。
  • convinience:方便,经常有需要异步执行or定时执行的任务,可以通过线程完成。

多协程面临的问题

  • race conditions:多线程会引入竞态条件的场景
    • avoid sharing:避免共享内存以防止竞态条件场景的产生(Go有一个竞态检测器race detector,能够辅助识别代码中的一些竞态条件场景)
    • use locks:让一系列指令变成原子操作
  • coordination:同步协调问题,比如一个线程的执行依赖另一个线程的执行结果等
    • channels:通道允许同时通信和协调
    • condition variables:配合互斥锁使用
  • deadlock:死锁问题,比如在go中简单的死锁场景,一个写线程往channel写数据,但是永远没有读线程从channel读数据,那么写线程被永久阻塞,即死锁,go会抓住这种场景,抛出运行时错误runtime error。

go中还存在一个协程泄漏的问题:参考下面代码示例第四个:channel的使用。

代码演示

存在共享变量的资源竞争

// 定义一个工具类,不具有实际意义。
package main

import "math/rand"

func requestVote() bool {
	intn := rand.Intn(10)
	if intn <= 5 {
		return true
	}
	return false
}

// 基础代码
package main

func main() {

	count := 0
	finished := 0

	for i := 0; i < 10; i++ {
		// 匿名函数,创建共 10 个线程
		go func() {
			vote := requestVote() // 一个内部sleep随机时间,最后返回true的函数,模拟投票
			if vote {
				count++
			}
			finished++
		}()
	}
	for count < 5 && finished != 10 {
		// wait
	}
	if count >= 5 {
		println("received 5+ votes!")
	} else {
		println("lost")
	}
}

使用go自带的资源竞争分析工具:

go run -race xxx.go

执行结果如下:

==================
WARNING: DATA RACE
Write at 0x00c00001c118 by goroutine 9:
  main.main.func1()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:20 +0x77

Previous read at 0x00c00001c118 by main goroutine:
  main.main()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:25 +0x147

Goroutine 9 (running) created at:
  main.main()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:17 +0x71
==================
==================
WARNING: DATA RACE
Write at 0x00c00001c128 by goroutine 9:
  main.main.func1()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:22 +0xa4

Previous read at 0x00c00001c128 by main goroutine:
  main.main()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:25 +0x164

Goroutine 9 (running) created at:
  main.main()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:17 +0x71
==================
received 5+ votes!
==================
WARNING: DATA RACE
Read at 0x00c00001c128 by goroutine 12:
  main.main.func1()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:22 +0x91

Previous write at 0x00c00001c128 by goroutine 11:
  main.main.func1()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:22 +0xa4

Goroutine 12 (running) created at:
  main.main()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:17 +0x71

Goroutine 11 (finished) created at:
  main.main()
      /home/wt/Backend/go/goprojects/src/golearndetail/concurrency/learn01/voteselect.go:17 +0x71
==================
Found 3 data race(s)
exit status 66

通过上述代码运行结果,可以得知存在多个协程的情况下对共享变量count、finished存在资源竞争。

Mutex解决资源竞争

Mutex基本介绍

互斥锁Mutex可以看作是针对某一个临机区或一组相关临界区的唯一访问令牌。
互斥锁的使用需要主要以下要求:

  1. 不要重复锁定互斥锁;
  2. 不要忘记解锁互斥锁,必要时使用defer语句;
  3. 不要对尚未锁定或者已解锁的互斥锁解锁;
  4. 不要在多个函数之间直接传递互斥锁。

代码演示

代码示例:

package main

import (
	"sync"
)

func main() {

	count := 0
	finished := 0
	var mu sync.Mutex

	for i := 0; i < 10; i++ {
		// 匿名函数,创建共 10 个线程
		go func() {
			vote := requestVote() // 一个内部sleep随机时间,最后返回true的函数,模拟投票
			// 临界区加锁
			mu.Lock()
			// 推迟到基本block结束后执行,这里即函数执行结束后 自动执行解锁操作。利用defer语言,一般在声明加锁后,立即defer声明推迟解锁
			defer mu.Unlock()
			if vote {
				count++
			}
			finished++
		}()
	}
	for {
		mu.Lock()
		if count > 5 || finished == 10 {
			// 不能在此处解锁,下面仍然需要对count变量进行判断
			//mu.Unlock()
			break
		}
		mu.Unlock()
		// wait
	}
	if count >= 5 {
		println("received 5+ votes!")
	} else {
		println("lost")
	}
	mu.Unlock()
}

运行结果:

wt@wt:~/Backend/go/goprojects/src/golearndetail/concurrency/learn01$ go run -race voteselect_mutex.go utils.go 
received 5+ votes!

使用Mutex后,及时加锁以及解锁,不存在资源竞争。

Mutex+Cond解决资源竞争和CPU空转

Cond条件变量讲解

条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。同时需要注意条件变量不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。

如何初始化一个条件变量:

	var mu sync.Mutex
	cond := sync.NewCond(&mu)

上图使用锁mu的指针类型初始化一个cond条件变量:条件变量的初始化离不开互斥锁,并且条件变量的所有方法也是基于互斥锁的。

条件变量提供的方法有哪些:

  • 等待通知(wait):等待通知的使用需要基于条件变量相关联的互斥锁加锁的保护下使用。
  • 单发通知(signal):
  • 广播通知(broadcast):对于单发通知或者广播通知的使用与等待通知的使用恰恰相反,需要在对应的互斥锁解锁之后再使用。

代码演示

代码示例:

package main

import "sync"

func main() {

	count := 0
	finished := 0
	var mu sync.Mutex
	//cond := sync.NewCond(&mu)
	cond := sync.Cond{L: &mu}
	for i := 0; i < 10; i++ {
		// 匿名函数,创建共 10 个线程
		go func() {
			vote := requestVote() // 一个内部sleep随机时间,最后返回true的函数,模拟投票
			// 临界区加锁
			mu.Lock()
			// 这里只有一个waiter,所以用Signal或者Broadcast都可以
			defer cond.Broadcast()
			// 推迟到基本block结束后执行,这里即函数执行结束后 自动执行解锁操作。利用defer语言,一般在声明加锁后,立即defer声明推迟解锁
			defer mu.Unlock()
			if vote {
				count++
			}
			finished++
			// Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护;Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。
			
		}()
	}
	mu.Lock()
	for count < 5 || finished != 10 {
		// 如果条件不满足,则在制定的条件变量上wait。内部原子地进入sleep状态,并释放与条件变量关联的锁。当条件变量得到满足时,这里内部重新获取到条件变量关联的锁,函数返回。
		cond.Wait()
		// 使用cond.Wait的目的防止CPU空转,使用time.sleep()无法控制合适的休眠时间
	}
	if count >= 5 {
		println("received 5+ votes!")
	} else {
		println("lost")
	}
	mu.Unlock()
}

执行结果:

wt@wt:~/Backend/go/goprojects/src/golearndetail/concurrency/learn01$ go run -race voteselect_mutex_cond.go utils.go 
received 5+ votes!

代码片段关于wait和for循环的解释说明

观察下面代码片段:

	for count < 5 || finished != 10 {
		// 如果条件不满足,则在制定的条件变量上wait。内部原子地进入sleep状态,并释放与条件变量关联的锁。当条件变量得到满足时,这里内部重新获取到条件变量关联的锁,函数返回。
		cond.Wait()
		// 使用cond.Wait的目的防止CPU空转,使用time.sleep()无法控制合适的休眠时间
	}
// -----------------------------------------------------------------------
		// 临界区加锁
		mu.Lock()
		// Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护;Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。
		// 这里只有一个waiter,所以用Signal或者Broadcast都可以
		defer cond.Broadcast()
		// 推迟到基本block结束后执行,这里即函数执行结束后 自动执行解锁操作。利用defer语言,一般在声明加锁后,立即defer声明推迟解锁
		defer mu.Unlock()

上述代码中的条件变量cond调用Wait()方法时做了哪些?同时为什么需要使用for循环进行状态判断而不是if条件进行状态判断?

条件变量Wait方法主要做了四件事:

  1. 把调用它的 goroutine(也就是当前的 goroutine)加入到当前条件变量的通知队列中。
  2. 解锁当前的条件变量基于的那个互斥锁:所以调用Wait()方法之前必须将条件变量关联的锁进行上锁,如果没有锁定互斥锁,在执行解锁操作时会触发一个panic。
  3. 让当前的 goroutine 处于等待状态,等到通知到来时再决定是否唤醒它。此时,这个goroutine 就会阻塞在调用这个Wait方法的那行代码上。
  4. 如果通知到来并且决定唤醒这个 goroutine,那么就在唤醒它之后重新锁定当前条件变量基于的互斥锁。自此之后,当前的 goroutine 就会继续执行后面的代码了。

调用Wait()方法后能否不解锁当前互斥锁?
不可以:如果当前goroutine不解锁当前互斥锁,依靠其他goroutine来解锁当前互斥锁,可能发生多次解锁,引发panic。所以不能通过其他goroutine来解锁当前互斥锁,只能当前goroutine来解锁。如果当前goroutine不解锁对应互斥锁,那么没有任何goroutine能够进入临界区,并改变共享资源的状态。程序所有goroutine全部阻塞。

为什么要使用for循环对条件变量进行判断,而不是使用if对条件变量进行判断?
if语句只会对共享资源的状态检查一次,而for语句却可以做多次检查,直到这个状态改变为止。 例如:如果一个 goroutine 因收到通知而被唤醒,但却发现共享资源的状态,依然不符合它的要求,那么就应该再次调用条件变量的Wait方法,并继续等待下次通知的到来。(上面代码演示的就是这个效果,如果使用If第一次唤醒后不满足条件就直接退出,无法进行后续的判断。)

条件变量Signal方法和Broadcast方法有那些异同

相同点:

  • Signal方法和Broadcast方法都是被用来发送通知的,

不同点:

  • Signal方法只会唤醒一个因此而等待的 goroutine,而Broadcast的通知却会唤醒所有为此等待的 goroutine。
  • 条件变量的Wait方法总会把当前的 goroutine 添加到通知队列的队尾,而它的Signal方法总会从通知队列的队首开始查找可被唤醒的 goroutine。所以,因Signal方法的通知而被唤醒的goroutine 一般都是最早等待的那一个。

Signal方法和Broadcast方法使用注意事项:

  1. 条件变量的Signal方法和Broadcast方法并不需要在互斥锁的保护下执行。恰恰相反,我们最好在解锁条件变量基于的那个互斥锁之后,再去调用它的这两个方法。这更有利于程序的运行效率。(上述代码展示)
  2. 条件变量的通知具有即时性。也就是说,如果发送通知的时候没有 goroutine为此等待,那么该通知就会被直接丢弃。在这之后才开始等待的 goroutine 只可能被后面的通知唤醒。

chan解决资源竞争

代码示例:

package main

func main() {

	count := 0
	finished := 0
	ch := make(chan bool)
	for i := 0; i < 10; i++ {
		// 匿名函数,创建共 10 个线程
		go func() {
			ch <- requestVote()
		}()
	}
	// 这里实现并不完美,如果count >= 5了,主线程不会再监听channel,导致其他还在运行的子线程会阻塞在往channel写数据的步骤。
	// 但是这里主线程退出后子线程们也会被销毁,影响不大。但如果是在一个长期运行的大型工程中,这里就存在泄露线程leaking threads
	for count < 5 && finished != 10 {
		// 主线程在这里等待
		v := <-ch
		if v {
			count++
		}
		finished++
	}
	if count >= 5 {
		println("received 5+ votes!")
	} else {
		println("lost")
	}
}

上述代码中涉及到一个线程泄漏的问题:

在Go语言中,"泄露线程leaking threads"通常指的是goroutine泄漏。Goroutine是Go语言中实现并发的轻量级线程,它们应该是临时的和短暂的。然而,如果一个goroutine因为某些原因无法正常结束,它就会一直运行,从而导致资源无法释放,这就是所谓的goroutine泄漏。
上面代码如果长期运行,由于非缓冲通道没有数据接收者,会导致部分goroutine一直阻塞在向其写数据的操作中,造成线程泄漏。

对于非缓冲通道的理解可以参考我的另一篇博文:
https://blog.csdn.net/weixin_45863010/article/details/142618550?spm=1001.2014.3001.5501
执行结果:

wt@wt:~/Backend/go/goprojects/src/golearndetail/concurrency/learn01$ go run -race voteselect_channel.go utils.go 
received 5+ votes!