1、背景
chan可以说是go中非常好用的并发控制模块,并且是并发安全的,采用了CSP思想(通过通信来共享内存,而不是通过共享内存来通信),chan虽然好用,但是在使用过程中一不小心可能就会使程序进入阻塞状态,排查起来困难,下面我们就根据chan的部分源码来理解初始化chan、从chan读数据、往chan写数据三部分原理。
2、go版本
$ go version
go version go1.21.4 windows/386
3、源码解释
【1】chan的底层结构
chan的底层结构位于:src/runtime/chan.go,其结构为:
type hchan struct {
qcount uint //通道中已经存储了的元素个数
dataqsiz uint //通道中就可以存储的最大元素个数
buf unsafe.Pointer //存储元素的缓冲区
elemsize uint16 //元素大小
closed uint32 //通道是否关闭
elemtype *_type //元素类型
sendx uint //即将向通道中写数据的缓冲区位置
recvx uint //即将向通道中读数据的缓冲区位置
recvq waitq //等待中的向通道读取数据的协程队列
sendq waitq //等待中的往通道写数据的协程队列
lock mutex //保证并发安全
}
chan的数据结构其实可以抽象成一个圆环结构如下:
【2】chan初始化
我们使用make初始化通道的时候,实质上是调用了makechan函数,位于:src/runtime/chan.go中,关键源码如下:
func makechan(t *chantype, size int) *hchan {
//...
//计算出通道缓冲区存储元素所需大小
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan //定义一个未初始化的chan对象
switch {
case mem == 0: //无缓冲区通道
c = (*hchan)(mallocgc(hchanSize, nil, true)) //申请一块hchan结构大小的内存
c.buf = c.raceaddr()
case elem.PtrBytes == 0: //有缓冲区通道并且存储的元素不包含指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) //申请chan结构大小+元素大小的内存
c.buf = add(unsafe.Pointer(c), hchanSize) //将申请内存大小后移chan结构大小后的地址赋值给存储元素的buf
default: //有缓冲区并且元素内包含指针
c = new(hchan) //申请一块hchan大小的内存
c.buf = mallocgc(mem, elem, true) //为存放元素的buf单独申请一块空间
}
//...
return c
}
总结一下初始化chan时,分为三种情况,用图表示如下
【3】往chan发送数据
往chan写数据实质上是调用了chansend函数,位于:src/runtime/chan.go中,关键源码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { //未初始化的通道,会挂起协程
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
//...
if c.closed != 0 { //往关闭的chan里写数据会panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//从等待从chan中读取数据的协程队列中取出第一个协程
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3) //将数据直接发送给读取的协程
return true
}
if c.qcount < c.dataqsiz { //缓冲区元素<缓冲区大小,说明缓冲区还有空间
qp := chanbuf(c, c.sendx) //得到要写入缓冲区的地址
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep) //将要写入的数据ep复制到缓冲区qp位置
c.sendx++ //写入缓冲区之后,写缓冲区的位置+1
if c.sendx == c.dataqsiz { //到缓冲区最后的位置之后从头开始写
c.sendx = 0
}
c.qcount++ //缓冲区数量+1
unlock(&c.lock)
return true
}
//...
//缓冲区空间不够放到协程队列中等待唤醒
c.sendq.enqueue(mysg)
//...
}
上述往chan写数据可以分为3个场景:
1、直接将数据发送到有往通道读取数据的协程。
2、缓冲区空间足够就将数据写入缓冲区。
3、缓冲区空间不足就进入写协程队列中等待唤醒。
通过上述代码段能总结写chan时的一些注意事项:
1、往未初始化的chan里写数据会阻塞。
2、往关闭的chan里写数据会panic。
3、往缓冲区写满的chan里写数据会阻塞。
【4】从chan读取数据
从chan读取数据调用的函数是chanrecv,位于:src/runtime/chan.go中,关键源码如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil { //通道未初始化,当前协程挂起
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
//...
if c.closed != 0 { //通道关闭
if c.qcount == 0 { //缓冲区无数据,不会有任何报错,所有能从关闭的无缓冲数据的通道里一直读取零值
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false //第二个返回值为false,这就是为啥可以检测通道是否关闭的原因
}
} else { //通道未关闭,从写chan的协程队列中取出第一个去读取数据
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 { //缓冲区有数据
qp := chanbuf(c, c.recvx) //得到读缓冲区位置地址
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //将读取到的缓冲区数据复制到ep中
}
typedmemclr(c.elemtype, qp) //清空已经被读取的缓冲区数据
c.recvx++ //读取缓冲区位置+1
if c.recvx == c.dataqsiz { //到缓冲区最后的位置后从起始位置开始读
c.recvx = 0
}
c.qcount-- //缓冲区数量-1
unlock(&c.lock)
return true, true
}
//...
//缓冲区中无数据读取就进入协程队列等待
c.recvq.enqueue(mysg)
//...
}
上述从chan读取数据也可以分为3个场景:
1、直接从写通道的协程中读取数据。
2、缓冲区有数据就从缓冲区读取数据。
3、缓冲区无数据就进入读协程队列中等待唤醒。
通过上述代码也能总结出读chan时的注意事项:
1、从未初始化的chan中读取数据会阻塞。
2、从已关闭的chan中读取数据,缓冲区有数据就会读取缓冲区数据,无数据就会得到零值。
3、chan关闭时,从chan中读取数据的第2个参数不一定是false,比如缓冲区有数据就为true。
4、缓冲区无数据时,从chan中读取就会阻塞。
4、总结
通过分析写chan、读chan的源码,我们在写代码时能很容易避免协程阻塞,能很清楚的理解未初始化chan和关闭chan对读写操作的影响。