数据结构
hchan:channel 数据结构
qcount:当前 channel 中存在多少个元素;
dataqsize: 当前 channel 能存放的元素容量;
buf:channel 中用于存放元素的环形缓冲区;
elemsize:channel 元素类型的大小;
closed:标识 channel 是否关闭;
elemtype:channel 元素类型;
sendx:发送元素进入环形缓冲区的 index;
recvx:接收元素所处的环形缓冲区的 index;
recvq:因接收而陷入阻塞的协程队列;
sendq:因发送而陷入阻塞的协程队列;
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
waitq:阻塞的协程队列
first:队列头部
last:队列尾部
type waitq struct {
first *sudog
last *sudog
}
//转到:链接名reflect_makechan reflect.makechan
func reflect_makechan(t *chantype, size int) *hchan {
return makechan(t, size)
}
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
//编译器会检查这一点,但要安全。
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
//当buf中存储的元素不包含指针时,Hchan不包含GC感兴趣的指针。
//buf指向相同的分配,elemtype是持久的。
//SudoG是从其所属线程引用的,因此无法收集它们。
//TODO(dvyukov,rlh):重新思考收集器何时可以移动分配的对象。
var c *hchan
switch {
case mem == 0:
//队列或元素大小为零。
c = (*hchan)(mallocgc(hchanSize, nil, true))
//竞赛检测器使用此位置进行同步。
c.buf = c.raceaddr()
case !elem.Pointers():
//元素不包含指针。
//在一次通话中分配hchan和buf。
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
//元素包含指针。
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
//chanbuf(c,i)是指向缓冲区中第i个槽的指针。
//
//chanbuf应该是一个内部细节,
//但广泛使用的包使用linkname访问它。
//耻辱大厅的著名成员包括:
//-github/fjl/memsize
//
//不要删除或更改类型签名。
//见go.dev/issue/67401。
//
//go:链接名chanbuf
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
//full报告c上的发送是否会阻塞(即通道已满)。
//它使用可变状态的单个字大小的读取,因此尽管
//答案瞬间为真,正确答案可能已经改变
//当调用函数接收到返回值时。
func full(c *hchan) bool {
//c.dataqsiz是不可变的(在创建通道后从不写入)
//因此,在信道操作期间的任何时候都可以安全地读取。
if c.dataqsiz == 0 {
//假设指针读取是原子性的。
return c.recvq.first == nil
}
//假设uint读取是轻松原子的。
return c.qcount == c.dataqsiz
}
//编译代码中c<-x的入口点。
//
//去:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
/*
*通用单通道发送/接收
*如果block不是nil,
*那么协议将不会
*睡觉,但如果可以的话就回来
*不完整。
*
*睡眠可以用g.param==nil唤醒
*当睡眠中涉及的通道
*已关闭。最容易循环并重新运行
*手术;我们会看到它现在已经关闭了。
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
//快速路径:检查未获取锁的非阻塞操作是否失败。
//
//在观察到通道未关闭后,我们观察到通道
//未准备好发送。这些观察中的每一个都是一个单词大小的阅读
//(第一个c.closed和第二个full())。
//因为封闭通道无法从“准备发送”转换为
//“未准备好发送”,即使通道在两次观测之间关闭,
//它们暗示了两者之间的某个时刻,当时通道都还没有关闭
//并且尚未准备好发送。我们表现得好像在那一刻观察到了通道,
//并报告发送无法继续。
//
//如果在这里对读取进行重新排序是可以的:如果我们观察到通道不是
//准备发送,然后观察到它没有关闭,这意味着
//在第一次观察期间,通道没有关闭。然而,这里什么都没有
//保证向前发展。我们依靠解锁的副作用
//chanrecv()和closechan()来更新这个线程的c.closed和full()视图。
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
//找到一个正在等待的接收器。我们传递要发送的值
//绕过信道缓冲器(如果有的话)直接传输到接收器。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
//通道缓冲区中有可用空间。取消要发送的元素的队列。
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
//封锁频道。某个接收器将为我们完成操作。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//在分配elem和查询mysg之间没有堆栈拆分
//在gp.waiting上,copystack可以在哪里找到它。
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
//向任何试图缩小我们堆栈的人发出信号,表明我们正在努力
//把车停在航道上。此G状态之间的窗口
//当我们设置gp.activeTackChans时
//堆叠收缩。
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
//确保发送的值保持有效,直到
//接收者将其复制出来。sudog有一个指针指向
//堆栈对象,但sudogs不被视为
//堆栈跟踪器。
KeepAlive(ep)
//有人把我们吵醒了。
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
//send在空信道c上处理发送操作。
//发送方发送的值ep被复制到接收方sg。
//然后,接收器被唤醒,继续它的快乐之路。
//通道c必须为空并锁定。send使用unlockf解锁c。
//sg必须已经从c中退出。
//ep必须为非nil,并指向堆或调用者的堆栈。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
//假装我们穿过缓冲区,即使
//我们直接复制。请注意,我们需要递增
//仅在启用赛道时才显示头部/尾部位置。
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
// timerchandrain removes all elements in channel c's buffer.
// It reports whether any elements were removed.
// Because it is only intended for timers, it does not
// handle waiting senders at all (all timer channels
// use non-blocking sends to fill the buffer).
func timerchandrain(c *hchan) bool {
// Note: Cannot use empty(c) because we are called
// while holding c.timer.sendLock, and empty(c) will
// call c.timer.maybeRunChan, which will deadlock.
// We are emptying the channel, so we only care about
// the count, not about potentially filling it up.
if atomic.Loaduint(&c.qcount) == 0 {
return false
}
lock(&c.lock)
any := false
for c.qcount > 0 {
any = true
typedmemclr(c.elemtype, chanbuf(c, c.recvx))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
}
unlock(&c.lock)
return any
}
// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.
// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
// No need for cgo write barrier checks because dst is always
// Go memory.
memmove(dst, src, t.Size_)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
// empty reports whether a read from c would block (that is, the channel is
// empty). It is atomically correct and sequentially consistent at the moment
// it returns, but since the channel is unlocked, the channel may become
// non-empty immediately afterward.
func empty(c *hchan) bool {
// c.dataqsiz is immutable.
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
// c.timer is also immutable (it is set after make(chan) but before any channel operations).
// All timer channels have dataqsiz > 0.
if c.timer != nil {
c.timer.maybeRunChan()
}
return atomic.Loaduint(&c.qcount) == 0
}
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if c.timer != nil {
c.timer.maybeRunChan()
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1. The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2. The value received by the receiver (the current G) is
// written to ep.
//
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
// There are unlocked sudogs that point into gp's stack. Stack
// copying must lock the channels of those sudogs.
// Set activeStackChans here instead of before we try parking
// because we could self-deadlock in stack growth on the
// channel lock.
gp.activeStackChans = true
// Mark that it's safe for stack shrinking to occur now,
// because any thread acquiring this G's stack for shrinking
// is guaranteed to observe activeStackChans after this store.
gp.parkingOnChan.Store(false)
// Make sure we unlock after setting activeStackChans and
// unsetting parkingOnChan. The moment we unlock chanLock
// we risk gp getting readied by a channel operation and
// so gp could continue running before everything before
// the unlock is visible (even to gp itself).
unlock((*mutex)(chanLock))
return true
}
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
// compiler implements
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
//go:linkname reflect_chansend reflect.chansend0
func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
return chansend(c, elem, !nb, getcallerpc())
}
//go:linkname reflect_chanrecv reflect.chanrecv
func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
return chanrecv(c, elem, !nb)
}
func chanlen(c *hchan) int {
if c == nil {
return 0
}
async := debug.asynctimerchan.Load() != 0
if c.timer != nil && async {
c.timer.maybeRunChan()
}
if c.timer != nil && !async {
// timer channels have a buffered implementation
// but present to users as unbuffered, so that we can
// undo sends without users noticing.
return 0
}
return int(c.qcount)
}
func chancap(c *hchan) int {
if c == nil {
return 0
}
if c.timer != nil {
async := debug.asynctimerchan.Load() != 0
if async {
return int(c.dataqsiz)
}
// timer channels have a buffered implementation
// but present to users as unbuffered, so that we can
// undo sends without users noticing.
return 0
}
return int(c.dataqsiz)
}
//go:linkname reflect_chanlen reflect.chanlen
func reflect_chanlen(c *hchan) int {
return chanlen(c)
}
//go:linkname reflectlite_chanlen internal/reflectlite.chanlen
func reflectlite_chanlen(c *hchan) int {
return chanlen(c)
}
//go:linkname reflect_chancap reflect.chancap
func reflect_chancap(c *hchan) int {
return chancap(c)
}
//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
closechan(c)
}
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil
x := q.last
if x == nil {
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
sgp.prev = x
x.next = sgp
q.last = sgp
}
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudoG)
}
// if a goroutine was put on this queue because of a
// select, there is a small window between the goroutine
// being woken up by a different case and it grabbing the
// channel locks. Once it has the lock
// it removes itself from the queue, so we won't see it after that.
// We use a flag in the G struct to tell us when someone
// else has won the race to signal this goroutine but the goroutine
// hasn't removed itself from the queue yet.
if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
continue
}
return sgp
}
}
func (c *hchan) raceaddr() unsafe.Pointer {
// Treat read-like and write-like operations on the channel to
// happen at this address. Avoid using the address of qcount
// or dataqsiz, because the len() and cap() builtins read
// those addresses, and we don't want them racing with
// operations like close().
return unsafe.Pointer(&c.buf)
}
func racesync(c *hchan, sg *sudog) {
racerelease(chanbuf(c, 0))
raceacquireg(sg.g, chanbuf(c, 0))
racereleaseg(sg.g, chanbuf(c, 0))
raceacquire(chanbuf(c, 0))
}
// Notify the race detector of a send or receive involving buffer entry idx
// and a channel c or its communicating partner sg.
// This function handles the special case of c.elemsize==0.
func racenotify(c *hchan, idx uint, sg *sudog) {
// We could have passed the unsafe.Pointer corresponding to entry idx
// instead of idx itself. However, in a future version of this function,
// we can use idx to better handle the case of elemsize==0.
// A future improvement to the detector is to call TSan with c and idx:
// this way, Go will continue to not allocating buffer entries for channels
// of elemsize==0, yet the race detector can be made to handle multiple
// sync objects underneath the hood (one sync object per idx)
qp := chanbuf(c, idx)
// When elemsize==0, we don't allocate a full buffer for the channel.
// Instead of individual buffer entries, the race detector uses the
// c.buf as the only buffer entry. This simplification prevents us from
// following the memory model's happens-before rules (rules that are
// implemented in racereleaseacquire). Instead, we accumulate happens-before
// information in the synchronization object associated with c.buf.
if c.elemsize == 0 {
if sg == nil {
raceacquire(qp)
racerelease(qp)
} else {
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
} else {
if sg == nil {
racereleaseacquire(qp)
} else {
racereleaseacquireg(sg.g, qp)
}
}
}
其他
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
debugChan = false
)