channel
先看一下源码中的说明
At least one of c.sendq and c.recvq is empty, except for the case of an unbuffered channel with a single goroutine blocked on it for both sending and receiving using a select statement, in which case the length of c.sendq and c.recvq is limited only by the size of the select statement.
For buffered channels, also:
c.qcount > 0 implies that c.recvq is empty.
c.qcount < c.dataqsiz implies that c.sendq is empty.
c.sendq和c.recvq至少有一个是empty;有一种情况例外:对于一个无缓冲的chan,在select语句中使用chan收发数据会被single gouroutine阻塞,[演示示例如下],此时c.sendq和c.recv.q的长度由select的size的决定;对于有缓冲的chan:
1.c.qcount>0,表明c.recvq为empty
2.c.qcount<c.dataqsiz表明sendq为empty
- 对于"unbuffered chan",放在select语句中会被阻塞
- 放在一个goroutine中会产生"dead lock"
- 放在不同的协程中就不会产生dead lock
- 对于nil chan放在同一个协程中和不同的协程中都会被阻塞产生deadlock;放在select语句可以走default分支
- nil chan只是声明而没有定义,没有为其分配内存;empty chan为其分配内存但是无缓冲
hchan
接下来看一下chan结构体"hchan"的定义
type hchan struct {
qcount uint // buf中的元素个数
dataqsiz uint // buf的容量
buf unsafe.Pointer // 指向存储数据的底层数组
elemsize uint16 //元素大小
closed uint32 //标识chan是否关闭
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
//Lock保护hchan中的所有字段,以及在此通道上阻塞的sudogs中的几个字段。
lock mutex
}
type waitq struct {
first *sudog
//sudog represents a g in a wait list,
//such as for sending/receiving on a channel.
last *sudog
}
//只列出了部分字段
type sudog struct {
g *g
next *sudog
prev *sudog
......
......
c *hchan // channel
}
makechan
利用"make(chan int,1)“创建并初始化一个chan,会调用"makechan”
func makechan(t *chantype, size int) *hchan {
//获取chan中存储元素的相关信息
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//elem.size表示元素的大小,size表示元素的个数
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
//元素大小为0或者元素个数为0
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
//元素不包含指针类型的数据,一次性分配hchan和buf
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
//元素含有指针,需要先为hchan分配内存,然后再为buf分配内存,这样做是为了方便gc回收
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
总结一下分配的方式:
- 对于无缓冲的chan [size==0]或者chan中存储的元素大小为0[elemsize==0]只分配hchan结构体大小的内存
- 不含有指针类型的数据,一次性分配hchan+bufsize大小的内存
- 含有指针类型的数据,先为hchan结构体分配内存,然后再为buf分配内存
chansend
接下来看一下,向chan中发送数据的流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
//当前的协程因向nilchan发送send而被挂起阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
/*
fast path:
在不获取锁的情况下,检查失败的非阻塞操作
full(c)为true的情况:
1.无缓冲但是没有等待接收的reciver
2.有缓冲但是缓冲通道满了
*/
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//上锁
lock(&c.lock)
if c.closed != 0 {
//chan已经被关闭解锁,并报panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//c.recvq.dequeue获取recvq的第一个sudog
if sg := c.recvq.dequeue(); sg != nil {
//如果有正在等待的reciver,可以直接将数据拷贝给该
//reciver;绕过chan的缓冲区;拷贝完之后释放锁;
//send详解见下面
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//有发送数据的位置
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
//chanbuf返回buf中第sendx个 slot
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
//将ep拷贝到qp
typedmemmove(c.elemtype, qp, ep)
//sendx指向下一个发送数据的位置,将其加1
c.sendx++
//循环数组
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//chan中元素个数加1
c.qcount++
//解锁
unlock(&c.lock)
return true
}
if !block {
//没有正在等待接收数据的reciver也没有可用的空位置存储数据,在非阻塞模式下直接解锁+返回
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
/*
getg returns the pointer to the current g. The compiler rewrites calls to this function into instructions that fetch the g directly (from TLS or from the dedicated register)
返回当前正在运行的goroutine
*/
gp := getg()
//新创建一个sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//初始化mysg
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//将mysg加入到sendq
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
//将当前协程挂起,挂起原因是send chan
// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
//确保qp对象在reciver接受它之前是有效的
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
//chan 被关闭
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
/*
full reports 向chan发送数据是否会被阻塞[通道满的情况下会被阻塞]
*/
func full(c *hchan) bool {
//c.dataqsiz是一个不可变的字段,创建之后不会被改变。
//因此对它的读是并发安全的
if c.dataqsiz == 0 {
//无缓冲的情况,此时要看是否有等待读取的协程
return c.recvq.first == nil
}
// 通道已满
return c.qcount == c.dataqsiz
}
//获取waitq中的一个sudog
func (q *waitq) dequeue() *sudog {
for {
//获取第一个sudog
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
//waitq中只有一个sudog
if y == nil {
q.first = nil
q.last = nil
} else {
//ulink and modify first sudog
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudoG)
}
/*
if a goroutine was put on this queue
because of a select, there is a small
window between the goroutine being woken
up by a different case and it grabbing the
channel locks.
Once it has the lock it removes itself
from the queue, so we won't see it after
that. We use a flag in the G struct to
tell us when someone else has won the race
to signal this goroutine but the goroutine
hasn't removed itself from the queue yet.
如果一个goroutine因为select操作被阻塞而被放入各个case中的waitq[recvq\sendq]中,在这个goroutine被不同的case唤醒到获取chan lock之间有一小段间隙;
一旦这个goroutine获取到lock,就会从waitq中移除
在g结构体中有一个字段[selectDone]用来通知goroutine1有其它goroutine2已经拿到了这个锁但是goroutine1还没有从waitq中移除。
*/
/*
isSelect indicates g is participating in a select, so g.selectDone must be CAS'd to win the wake-up race
*/
//如果没有成功就会直接退出,进入下一个循环
if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
continue
}
/*
这里说一下个人的理解:
select 的第一个case
对于一个chan1 的recvq,第一个被阻塞的g1是因为等待接收数据而被阻塞的,g1并非当前执行select的协程;
select 的第二个case
向chan2发送数据,刚开始这两个case都被阻塞,于是这个goroutine被加入到chan1的recvq和chan2的sendq,第二轮循环中,向chan2写数据成功,goroutine被唤醒,然后当前goroutine的selectDone字段被标识为1,与此同时从chan1接收数据成功,但此时会通过selectDone标识判断出当前的g已经被其它case唤醒,于是会继续寻找下一个sudog[当前的goroutine已经从waitq中移除]
*/
return sgp
}
}
//向waitq中加入一个sudog
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil
x := q.last
if x == nil {
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
sgp.prev = x
x.next = sgp
q.last = sgp
}
/*
send在一个empty channel上执行发送操作;发送方发送的ep值被拷贝到接收方的sg中。然后reciver被唤醒;
channel必须是empty或者被锁定。ep必须是非空的
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
//解锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//Mark gp ready to run
//标记状态由_Gwaiting到_Grunnable
/*
先尝试放入本地p的runnext字段,然后是本地runq
如果这两个地方都满了,会将其放入全局runq
*/
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// No need for cgo write barrier checks because dst is always
// Go memory.
//将src的数据拷贝到dst
memmove(dst, src, t.size)
}
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
总结一下发送数据的流程
向nil chan发送数据阻塞模式下会被挂起,非阻塞模式下返回false
首先从recvq中获取是否有正在等待接收数据的goroutine,有的话直接将发送者的数据拷贝到与goroutine绑定的sudog中
没有正在等待接收数据的reciver,但是缓冲区未满将发送者的数据发送到存储数据的循环数组中,并将元素个数加1
既没有正在等待接收数据的reciver也没有可用的缓冲位置,生成新的sudog结构与当前goroutine绑定加入到sendq的末尾,通过gopark将其挂起等待reciver从chan中读取数据
对nil通道close会产生panic
在真正进行发送数据的流程之前会进行加锁操作,在第一种情况发生之后会释放锁,否则会持续到第二种或者第三种情况;糟糕的情况是,锁一直持续到第三种情况才释放,加大了临界区的范围延长了锁持有的时间,降低了并发性能。
通道已经被关闭,向其发送数据会产生panic
通道close之后再次被close会被panic
chanrecv
/*
chanrecv从chan中接收数据并将数据拷贝到ep中
如果ep是nil,接收到的数据会被忽略
如果block是false并且chan中没有数据会返回false
如果chan被关闭,不影响数据的读取,读取到的数据是类型对应的零值
*/
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
//从nil chan中获取数据;非阻塞模式下返回false,false
//阻塞模式下被挂起
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//非阻塞模式下
//empty(c)返回true的条件
//无缓冲通道并且没有正在等待发送数据的sender
//有缓冲通道但是c.qcount==0没有数据可以读取
if !block && empty(c) {
//通道没有被关闭
if atomic.Load(&c.closed) == 0 {
return
}
//通道被关闭
//empty(c)返回true的条件
//无缓冲通道并且没有正在等待发送数据的sender
//有缓冲通道但是c.qcount==0没有数据可以读取
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
//清空ep指向的内存
if ep != nil {
typedmemclr(c.elemtype, ep)
}
//返回true是因为chan be closed
//通道被关闭会通知所有监听该chan的goroutine
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//上锁
lock(&c.lock)
//chan关闭之后不能写入数据但是可以读取数据
if c.closed != 0 {
//缓冲区中没有数据
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
//没有数据可以读解锁
unlock(&c.lock)
//清空ep指向的memory
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
//走到这个分支表明缓冲区中有数据并且
//发现有阻塞等待发送数据的sender,发生的条件有
//无缓冲通道;有缓冲通道但是缓冲区满
if sg := c.sendq.dequeue(); sg != nil {
//从阻塞等待发送数据的sender中获取数据
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
//走到这个分支表明通道被关闭并且缓冲区有数据或者
//通道没有被关闭并且有数据可以读取
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
//将qp中的数据拷贝到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//清楚qp
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//元素个数减少
c.qcount--
//解锁
unlock(&c.lock)
return true, true
}
//没有数据可以读非阻塞直接返回
if !block {
//解锁
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
//创建新的sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
//与当前goroutine绑定
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
//加入到recvq
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
//将当前的goroutine挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
func empty(c *hchan) bool {
// c.dataqsiz is immutable.
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
/*
recv在一个full chan[buf is full]上执行接收数据的操作
对于无缓冲通道直接从sender中拷贝数据
对于有缓冲通道
1.将the head of queue的数据拷贝到reciver中
2.将sender的数据发送到缓冲区
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//无缓冲通道
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
//直接从sender中拷贝数据
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
//获取接受数数据的slot
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//空出缓冲区,将sender中的数据发送到缓冲区
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//抽取一个元素加入一个元素,buf依旧是满的
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
//释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
//因chan close被唤醒标记为false;反之标记为true
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//将gp由_Gwaiting变为_Grunnable
//尝试加入到本地p的runnext,
//本地p的runq以及全局的runq
goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
总结一下读取数据的流程
对于nil chan,读取数据会被gopark挂起
chan be closed并且没有数据可读,返回
chan be closed并且有正在等待发送数据的sender
1.对于无缓冲,直接从sender那里拷贝数据;
2.对于有缓冲,先从queue的头部获取数据
然后将sender的数据发送到缓冲区
将sender的状态由_Gwaiting变为_Grunnable计入到本地p的runnext等待被调度通道被关闭有数据可读或者没有被关闭但是有数据可读,直接从queue中读取数据,元素个数减少
通道没有被关闭没有数据可以读。创建新的sudog与接收数据的g绑定加入到recvq的末尾,通过gopark挂起等待其它goroutine向chan中发送数据
complie
在select中使用"ch<-i"或者"<-ch"会被编译成if-else语句
// compiler implements
select {
case c <- v:
... foo
default:
... bar
}
// as
if selectnbsend(c, v) {
... foo
} else {
... bar
}
// compiler implements
select {
case v, ok = <-c:
... foo
default:
... bar
}
// as
if selected, ok = selectnbrecv(&v, c); selected {
... foo
} else {
... bar
}
closechan
func closechan(c *hchan) {
//关闭nil chan会报错
if c == nil {
panic(plainError("close of nil channel"))
}
//上锁
lock(&c.lock)
//chan 已经被关闭过
if c.closed != 0 {
//释放锁,报panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
//将close字段标识为1
c.closed = 1
//
var glist gList
// release all readers
//通知所有的reader/reciver
for {
//从recvq中获取等待读取的sudog
sg := c.recvq.dequeue()
if sg == nil {
//遍历完成
break
}
if sg.elem != nil {
//清空sg.elem指向的内存
typedmemclr(c.elemtype, sg.elem)
//将指针悬空,防止成为野指针
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//获取与sudog绑定的g
gp := sg.g
gp.param = unsafe.Pointer(sg)
//因为关闭被唤醒设置为false
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
//将gp加入到glist中
glist.push(gp)
}
// release all writers (they will panic)
//释放所有的sender,并报panic
//因为向closed chan发送数据会产生panic
for {
//获取sendq中的sender
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//获取sudog中的g
gp := sg.g
gp.param = unsafe.Pointer(sg)
//因为close而被唤醒,设置为false
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
//将其加入到glist中
glist.push(gp)
}
//唤醒所有的sender和reciver才可以解锁
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
//将gp由_Gwaiting变为_Grunnable
//尝试加入到本地p的runnext,
//本地p的runq以及全局的runq
goready(gp, 3)
}
}
- 通道为nil,报panic: close of nil channel
- 通道已经关闭(通过closed标识判断),报panic: close of closed channel
- 唤醒recvq中的所有recver,并将唤醒的g加入到glist
- 唤醒sendq中的所有sender并报panic: send on closed channel。并将唤醒的g加入到glist