1.简单了解什么是协程和通道
什么是协程
协程,是一种用户级的轻量级的线程,拥有独立的栈空间并共享程序的堆空间。
它是在单线程的基础上通过算法来实现的微线程,相比于多线程编程具有以下优点:
- 协程的上下文切换由用户决定,无需系统内核的上下文切换,减少开销
- 协程默认会做好全方位保护,以防止中断。无需原子操作锁
- 单线程也可以实现高并发,甚至达到单核CPU就支持上万协程
什么是通道
通道,是一种用于协程之间进行通信的数据结构。类似于队列,一端为发送者,一端为接收者。使用通道可以很好地保证数据的同步性和顺序性。
通道分为有缓冲通道和无缓冲通道,其声明方式如下:
- 有缓冲通道
intChan := make(chan int,<缓冲容量>)
- 无缓冲通道
intChan := make(chan int)
有缓冲通道和无缓冲通道的区别:
- 阻塞:无缓冲通道发送者会阻塞直到数据被接收;有缓冲通道发送者会阻塞直到缓冲区满,接收者会阻塞直到缓冲区不为空。
- 数据同步和顺序:无缓冲通道保证数据的同步和顺序;有缓冲管道不保证数据的同步和顺序。
- 应用场景:无缓冲通道要求严格的同步和顺序性;有缓冲通道可以异步通信并提高吞吐量。
在无缓冲通道的实现中需要注意的是,通道的两端必须存在发送者和接收者,否则会导致死锁。
2.协程-通道并发编程案例
(1)交替打印字母和数字
题意:使用协程-通道交替打印数字1-10和字母A-J。
代码:
package main
import (
"fmt"
"sync"
)
/*
无缓冲chanel:需要在写入chanel的时候要保证有另外一个协程在读取chanel。否则会导致写端阻塞,发生死锁
解决办法:
避免死锁的发生:
当i循环到10时,printAlp协程已然结束,所以此时不必再写入alp通道
*/
func printNum(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
defer wg.Done()
for i := 1; i <= 10; i++ {
<-alpCh // 等待字母goroutine发信号
fmt.Print(i, " ")
//避免死锁发生
if i < 10 {
numCh <- struct{}{} // 发信号给字母goroutine
}
if i == 10 {
close(numCh)
}
}
}
func printAlp(wg *sync.WaitGroup, numCh chan struct{}, alpCh chan struct{}) {
defer wg.Done()
for i := 'A'; i <= 'J'; i++ {
<-numCh // 等待数字goroutine发信号
fmt.Printf("%c", i)
alpCh <- struct{}{} // 发信号给数字goroutine
}
close(alpCh)
}
func main() {
numCh := make(chan struct{}) // 用于数字goroutine的信号通道
alpCh := make(chan struct{}) // 用于字母goroutine的信号通道
var wg sync.WaitGroup
wg.Add(2)
go printAlp(&wg, numCh, alpCh)
go printNum(&wg, numCh, alpCh)
// 启动时先给数字goroutine发送一个信号
numCh <- struct{}{}
wg.Wait()
}
题目分析:
题目要求我们交替打印字母和数字,则需要保证两个协程的严格顺序性,符合无缓冲通道的应用场景。设置两个通道,分别存储数字和字母,两个打印数字和字母的协程分别担任两个通道的发送者和接收者的两重身份。循环打印一次,发一次信号,提醒另一个协程进行打印。
需要注意的是当最后一个字符'10'打印结束后,此时打印字母的协程已经结束,numCh通道已经没有接收者,此时已经不符合无缓冲通道的实现条件-必须存在发送者和接收者,再发送信号,会引起阻塞死锁。所以再第10次时不必再发送信号。
(2)设计一个任务调度器
题目:设计一个任务调度器,利用多协程+通道的编程模式,实现并发处理多任务的业务场景,且要求调度顺序按照任务添加顺序。
代码:
type scheduler struct {
taskChan chan func()
wt sync.WaitGroup
}
func (td *scheduler) AddTask(task func()) {
td.taskChan <- task
}
func (td *scheduler) Executer() {
defer td.wt.Done()
for {
task, ok := <-td.taskChan
task()
if ok && len(td.taskChan) == 0 {
break
}
}
}
func (td *scheduler) Start() {
td.wt.Add(4)
//假设四个消费者
for i := 0; i < 4; i++ {
go td.Executer()
}
td.wt.Wait()
}
func main() {
sd := scheduler{
taskChan: make(chan func(), 5),
}
go func() {
sd.AddTask(func() {
fmt.Println("任务1")
})
sd.AddTask(func() {
fmt.Println("任务2")
})
sd.AddTask(func() {
fmt.Println("任务3")
})
sd.AddTask(func() {
fmt.Println("任务4")
})
sd.AddTask(func() {
fmt.Println("任务5")
})
sd.AddTask(func() {
fmt.Println("任务6")
})
close(sd.taskChan)
}()
sd.Start()
}
问题分析:
由于添加的任务为多任务,不止一个,并且需要异步处理执行这些任务。符合有缓冲区的通道需要提高吞吐量和异步处理。
那么,我们需要将任务放进通道,多个接收者,按照顺序从通道中拿任务,并进行执行即可。
需要注意的问题是,如果在添加的任务数量大于通道的缓冲区,会在添加任务形成阻塞。为了不影响消费者的正常启动,需要将其单独开一个协程来添加任务。
这样当消费者进行消费时,形成阻塞的生产者会被唤醒,从而继续进行任务添加。
3.总结
经过对协程+通道的编程模式的学习,除了刚刚在题目中提到的,我们还应该注意以下问题:
1.为什么通道用完之后要关闭,不关闭有什么风险?
- 为了避免死锁。关闭通道,也是告诉接收者,在发送者那里已经没有数据可以发送了,不需要再继续等待数据了。接收者收到通道关闭的信息后,停止接收数据;若不关闭通道,则会让接收者一直处于阻塞状态,有发生死锁的风险。
- 释放资源和避免资源泄露。关闭通道后,系统会释放相应的资源,及时关闭通道则可以避免资源浪费和泄露。
2. 怎么优雅地关闭通道?
首先,关闭通道的最基本原则是不要关闭已经关闭的通道。其次还有一个使用Go通道的原则:不要在数据接收方或者在有多个发送者的情况下关闭通道。换句话说,我们只应该让一个通道唯一的发送者关闭此通道。
一种粗鲁的方式是通过异常恢复的方式来关闭通道,但很明显违反以上的原则且有可能发生数据竞争;另一种方式是sync.Once或sync.Mutex来关闭通道,当并不保证发生在一个通道上的并发关闭操作和发送操纵不会产生数据竞争。这两种方式都有一定的问题,就不过多介绍,下面介绍一种如何优雅地关闭通道的方法。
情形一:M个接收者和一个发送者
最容易处理的一种情形。当发送者需要结束发送时,让它关闭通道即可。上文的两个编程案例就是这种情形。
情形二:一个接收者和N个发送者
根据Go通道的基本原则,我们只能在通道的唯一发送者关闭通道。所以,在这种情况下,我们无法直接在某处关闭通道。但我们可以让接收者关闭一个额外的信号通道来告诉发送者不要再发送数据了。
package main
import (
"log"
"sync"
)
func main() {
cosnt N := 5
cosnt Max := 60000
count := 0
dataCh := make(chan int)
stopCh := make(chan bool)
var wt sync.WaitGroup
wt.Add(1)
//发送者
for i := 0; i < N; i++ {
go func() {
for {
select {
case <-stopCh:
return
default:
count += 1
dataCh <- count
}
}
}()
}
//接收者
go func() {
defer wt.Done()
for value := range dataCh {
if value == Max {
// 此唯一的接收者同时也是stopCh通道的
// 唯一发送者。尽管它不能安全地关闭dataCh数
// 据通道,但它可以安全地关闭stopCh通道。
close(stopCh)
return
}
log.Println(value)
}
}()
wt.Wait()
}
在这种方法中,我们额外增加了一个信号通道stopCh,在接收方通过它来告诉发送者不必再接收数据。并且,此方法并没有对dataCh进行关闭,当一个通道不再被任何协程使用时,它将会逐渐被垃圾回收掉,无论它是否已经被关闭。
此方法的优雅性就在于通过关闭一个通道来停止另一个通道的使用,从而间接关闭另一个通道。
情形三:M个接收者N个发送者
我们不能让接收者和发送者中的任何一个关闭用来传输数据的通道,我们也不能让多个接收者之一关闭一个额外的信号通道。这两种做法都违反了通道关闭原则。
不过,我们可以引入一个中间调解者角色并让其关闭额外的信号通道来通知所有接收者和发送者结束工作。
代码示例:
package main
import (
"log"
"math/rand"
"strconv"
"sync"
)
func main() {
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
var wt sync.WaitGroup
wt.Add(NumReceivers)
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh是一个额外的信号通道。它的发送
// 者为中间调解者。它的接收者为dataCh
// 数据通道的所有的发送者和接收者。
toStop := make(chan string, 1)
// toStop是一个用来通知中间调解者让其
// 关闭信号通道stopCh的第二个信号通道。
// 此第二个信号通道的发送者为dataCh数据
// 通道的所有的发送者和接收者,它的接收者
// 为中间调解者。它必须为一个缓冲通道。
var stoppedBy string
// 中间调解者
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// 发送者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// 为了防止阻塞,这里使用了一个尝试
// 发送操作来向中间调解者发送信号。
select {
case toStop <- "发送者#" + id:
default:
}
return
}
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// 接收者
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wt.Done()
for {
select {
case <-stopCh:
return
case value := <-dataCh:
if value == Max {
// 为了防止阻塞,这里使用了一个尝试
// 发送操作来向中间调解者发送信号。
select {
case toStop <- "接收者:" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
wt.Wait()
log.Println("被" + stoppedBy + "终止了")
}