go|channel源码分析

发布于:2025-06-02 ⋅ 阅读:(26) ⋅ 点赞:(0)

channel

先看一下源码中的说明

At least one of c.sendq and c.recvq is empty, except for the case of an unbuffered channel with a single goroutine blocked on it for both sending and receiving using a select statement, in which case the length of c.sendq and c.recvq is limited only by the size of the select statement.
For buffered channels, also:
c.qcount > 0 implies that c.recvq is empty.
c.qcount < c.dataqsiz implies that c.sendq is empty.

c.sendq和c.recvq至少有一个是empty;有一种情况例外:对于一个无缓冲的chan,在select语句中使用chan收发数据会被single gouroutine阻塞,[演示示例如下],此时c.sendq和c.recv.q的长度由select的size的决定;对于有缓冲的chan:
1.c.qcount>0,表明c.recvq为empty
2.c.qcount<c.dataqsiz表明sendq为empty

  • 对于"unbuffered chan",放在select语句中会被阻塞
    在这里插入图片描述
  • 放在一个goroutine中会产生"dead lock"
    在这里插入图片描述
  • 放在不同的协程中就不会产生dead lock
    在这里插入图片描述
  • 对于nil chan放在同一个协程中和不同的协程中都会被阻塞产生deadlock;放在select语句可以走default分支
  • nil chan只是声明而没有定义,没有为其分配内存;empty chan为其分配内存但是无缓冲

hchan

接下来看一下chan结构体"hchan"的定义

type hchan struct {
	qcount   uint           // buf中的元素个数
	dataqsiz uint           // buf的容量
	buf      unsafe.Pointer // 指向存储数据的底层数组
	elemsize uint16  //元素大小
	closed   uint32  //标识chan是否关闭
	elemtype *_type // element type

	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	//Lock保护hchan中的所有字段,以及在此通道上阻塞的sudogs中的几个字段。
	lock mutex
}
type waitq struct {
	first *sudog 
	//sudog represents a g in a wait list, 
	//such as for sending/receiving on a channel.
	last  *sudog
}
//只列出了部分字段
type sudog struct {
	g *g

	next *sudog
	prev *sudog
	......
	......
	c        *hchan // channel
}

makechan

利用"make(chan int,1)“创建并初始化一个chan,会调用"makechan”

func makechan(t *chantype, size int) *hchan {
	//获取chan中存储元素的相关信息
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	//elem.size表示元素的大小,size表示元素的个数
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	
	var c *hchan
	switch {
	case mem == 0:
		//元素大小为0或者元素个数为0
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		//元素不包含指针类型的数据,一次性分配hchan和buf
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		//元素含有指针,需要先为hchan分配内存,然后再为buf分配内存,这样做是为了方便gc回收
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

总结一下分配的方式:

  • 对于无缓冲的chan [size==0]或者chan中存储的元素大小为0[elemsize==0]只分配hchan结构体大小的内存
  • 不含有指针类型的数据,一次性分配hchan+bufsize大小的内存
  • 含有指针类型的数据,先为hchan结构体分配内存,然后再为buf分配内存

chansend

接下来看一下,向chan中发送数据的流程

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		//当前的协程因向nilchan发送send而被挂起阻塞
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}

	/*
	fast path:
	在不获取锁的情况下,检查失败的非阻塞操作
	full(c)为true的情况:
	1.无缓冲但是没有等待接收的reciver
	2.有缓冲但是缓冲通道满了
	*/
	if !block && c.closed == 0 && full(c) {
		return false
	}
	
	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	//上锁
	lock(&c.lock)

	if c.closed != 0 {
	//chan已经被关闭解锁,并报panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	//c.recvq.dequeue获取recvq的第一个sudog
	if sg := c.recvq.dequeue(); sg != nil {
		//如果有正在等待的reciver,可以直接将数据拷贝给该
		//reciver;绕过chan的缓冲区;拷贝完之后释放锁;
		//send详解见下面
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	//有发送数据的位置
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		//chanbuf返回buf中第sendx个 slot
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		//将ep拷贝到qp
		typedmemmove(c.elemtype, qp, ep)
		//sendx指向下一个发送数据的位置,将其加1
		c.sendx++
		//循环数组
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		//chan中元素个数加1
		c.qcount++
		//解锁
		unlock(&c.lock)
		return true
	}

	if !block {
	//没有正在等待接收数据的reciver也没有可用的空位置存储数据,在非阻塞模式下直接解锁+返回
		unlock(&c.lock)
		return false
	}

	// Block on the channel. Some receiver will complete our operation for us.
	/*
	getg returns the pointer to the current g. The compiler rewrites calls to this function into instructions that fetch the g directly (from TLS or from the dedicated register)
	返回当前正在运行的goroutine
	*/
	gp := getg()
	//新创建一个sudog
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	//初始化mysg
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	
	//将mysg加入到sendq
	c.sendq.enqueue(mysg)
	
	gp.parkingOnChan.Store(true)
	//将当前协程挂起,挂起原因是send chan
	// Puts the current goroutine into a waiting state and calls unlockf on the
	// system stack.
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	//确保qp对象在reciver接受它之前是有效的
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	//chan 被关闭
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
/*
full reports 向chan发送数据是否会被阻塞[通道满的情况下会被阻塞]
*/
func full(c *hchan) bool {
	//c.dataqsiz是一个不可变的字段,创建之后不会被改变。
	//因此对它的读是并发安全的
	if c.dataqsiz == 0 {
		//无缓冲的情况,此时要看是否有等待读取的协程
		return c.recvq.first == nil
	}
	// 通道已满
	return c.qcount == c.dataqsiz
}

//获取waitq中的一个sudog

func (q *waitq) dequeue() *sudog {
	for {
	//获取第一个sudog
		sgp := q.first
		if sgp == nil {
			return nil
		}
		
		y := sgp.next
		//waitq中只有一个sudog
		if y == nil {
			q.first = nil
			q.last = nil
		} else {
		//ulink and modify first sudog 
			y.prev = nil
			q.first = y
			sgp.next = nil // mark as removed (see dequeueSudoG)
		}
		/*
		if a goroutine was put on this queue 
		because of a select, there is a small 
		window between the goroutine being woken 
		up by a different case and it grabbing the 
		channel locks.  
		Once it has the lock it removes itself 
		from the queue, so we won't see it after 
		that.  We use a flag in the G struct to 
		tell us when someone else has won the race 
		to signal this goroutine but the goroutine 
		hasn't removed itself from the queue yet.
		如果一个goroutine因为select操作被阻塞而被放入各个case中的waitq[recvq\sendq]中,在这个goroutine被不同的case唤醒到获取chan lock之间有一小段间隙;
		一旦这个goroutine获取到lock,就会从waitq中移除
		在g结构体中有一个字段[selectDone]用来通知goroutine1有其它goroutine2已经拿到了这个锁但是goroutine1还没有从waitq中移除。
		*/
		/*
		isSelect indicates g is participating in a select, so g.selectDone must be CAS'd to win the wake-up race
		*/
		//如果没有成功就会直接退出,进入下一个循环
		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
			continue
		}
		/*
		这里说一下个人的理解:
		select 的第一个case
		对于一个chan1 的recvq,第一个被阻塞的g1是因为等待接收数据而被阻塞的,g1并非当前执行select的协程;
		select 的第二个case
		向chan2发送数据,刚开始这两个case都被阻塞,于是这个goroutine被加入到chan1的recvq和chan2的sendq,第二轮循环中,向chan2写数据成功,goroutine被唤醒,然后当前goroutine的selectDone字段被标识为1,与此同时从chan1接收数据成功,但此时会通过selectDone标识判断出当前的g已经被其它case唤醒,于是会继续寻找下一个sudog[当前的goroutine已经从waitq中移除] 
		
		*/

		return sgp
	}
}
//向waitq中加入一个sudog
func (q *waitq) enqueue(sgp *sudog) {
	sgp.next = nil
	x := q.last
	if x == nil {
		sgp.prev = nil
		q.first = sgp
		q.last = sgp
		return
	}
	sgp.prev = x
	x.next = sgp
	q.last = sgp
}
/*
send在一个empty channel上执行发送操作;发送方发送的ep值被拷贝到接收方的sg中。然后reciver被唤醒;
channel必须是empty或者被锁定。ep必须是非空的
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
			// Pretend we go through the buffer, even though
			// we copy directly. Note that we need to increment
			// the head/tail locations only when raceenabled.
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
		}
	}
	
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	//解锁
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	//Mark gp ready to run
	//标记状态由_Gwaiting到_Grunnable
	/*
	先尝试放入本地p的runnext字段,然后是本地runq
	如果这两个地方都满了,会将其放入全局runq
	*/
	goready(gp, skip+1)
}


func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	//将src的数据拷贝到dst
	memmove(dst, src, t.size)
}
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

总结一下发送数据的流程

  • 向nil chan发送数据阻塞模式下会被挂起,非阻塞模式下返回false

  • 首先从recvq中获取是否有正在等待接收数据的goroutine,有的话直接将发送者的数据拷贝到与goroutine绑定的sudog中

  • 没有正在等待接收数据的reciver,但是缓冲区未满将发送者的数据发送到存储数据的循环数组中,并将元素个数加1

  • 既没有正在等待接收数据的reciver也没有可用的缓冲位置,生成新的sudog结构与当前goroutine绑定加入到sendq的末尾,通过gopark将其挂起等待reciver从chan中读取数据

  • 对nil通道close会产生panic
    在真正进行发送数据的流程之前会进行加锁操作,在第一种情况发生之后会释放锁,否则会持续到第二种或者第三种情况;糟糕的情况是,锁一直持续到第三种情况才释放,加大了临界区的范围延长了锁持有的时间,降低了并发性能。
    在这里插入图片描述

  • 通道已经被关闭,向其发送数据会产生panic
    在这里插入图片描述

  • 通道close之后再次被close会被panic
    在这里插入图片描述

chanrecv


/*
chanrecv从chan中接收数据并将数据拷贝到ep中
如果ep是nil,接收到的数据会被忽略
如果block是false并且chan中没有数据会返回false
如果chan被关闭,不影响数据的读取,读取到的数据是类型对应的零值

*/
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	//从nil chan中获取数据;非阻塞模式下返回false,false
	//阻塞模式下被挂起
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	//非阻塞模式下
	//empty(c)返回true的条件
	//无缓冲通道并且没有正在等待发送数据的sender
	//有缓冲通道但是c.qcount==0没有数据可以读取
	
	if !block && empty(c) {
		//通道没有被关闭
		if atomic.Load(&c.closed) == 0 {
			
			return
		}
		//通道被关闭
		//empty(c)返回true的条件
		//无缓冲通道并且没有正在等待发送数据的sender
		//有缓冲通道但是c.qcount==0没有数据可以读取
		if empty(c) {
			// The channel is irreversibly closed and empty.
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			//清空ep指向的内存
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			//返回true是因为chan be closed
			//通道被关闭会通知所有监听该chan的goroutine
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	//上锁
	lock(&c.lock)
	
	//chan关闭之后不能写入数据但是可以读取数据
	if c.closed != 0 {
	//缓冲区中没有数据
		if c.qcount == 0 {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			//没有数据可以读解锁
			unlock(&c.lock)
			//清空ep指向的memory
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
		// The channel has been closed, but the channel's buffer have data.
	} else {
	//走到这个分支表明缓冲区中有数据并且
		//发现有阻塞等待发送数据的sender,发生的条件有
		//无缓冲通道;有缓冲通道但是缓冲区满
		if sg := c.sendq.dequeue(); sg != nil {
			//从阻塞等待发送数据的sender中获取数据
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}
	//走到这个分支表明通道被关闭并且缓冲区有数据或者
	//通道没有被关闭并且有数据可以读取
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		//将qp中的数据拷贝到ep
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		//清楚qp
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		//元素个数减少
		c.qcount--
		//解锁
		unlock(&c.lock)
		return true, true
	}
	//没有数据可以读非阻塞直接返回
	if !block {
	//解锁
		unlock(&c.lock)
		return false, false
	}

	// no sender available: block on this channel.
	gp := getg()
	//创建新的sudog
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	//与当前goroutine绑定
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	//加入到recvq
	c.recvq.enqueue(mysg)
	
	gp.parkingOnChan.Store(true)
	//将当前的goroutine挂起
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

func empty(c *hchan) bool {
	// c.dataqsiz is immutable.
	if c.dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	return atomic.Loaduint(&c.qcount) == 0
}
/*
recv在一个full chan[buf is full]上执行接收数据的操作
对于无缓冲通道直接从sender中拷贝数据
对于有缓冲通道
1.将the head of queue的数据拷贝到reciver中
2.将sender的数据发送到缓冲区
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//无缓冲通道
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		//直接从sender中拷贝数据
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		//获取接受数数据的slot
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
		}
		// copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		//空出缓冲区,将sender中的数据发送到缓冲区
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		//抽取一个元素加入一个元素,buf依旧是满的
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	//释放锁
	unlockf()
	gp.param = unsafe.Pointer(sg)
	//因chan close被唤醒标记为false;反之标记为true
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	//将gp由_Gwaiting变为_Grunnable
	//尝试加入到本地p的runnext,
	//本地p的runq以及全局的runq
	goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

总结一下读取数据的流程

  • 对于nil chan,读取数据会被gopark挂起

  • chan be closed并且没有数据可读,返回

  • chan be closed并且有正在等待发送数据的sender
    1.对于无缓冲,直接从sender那里拷贝数据;
    2.对于有缓冲,先从queue的头部获取数据
    然后将sender的数据发送到缓冲区
    将sender的状态由_Gwaiting变为_Grunnable计入到本地p的runnext等待被调度

  • 通道被关闭有数据可读或者没有被关闭但是有数据可读,直接从queue中读取数据,元素个数减少

  • 通道没有被关闭没有数据可以读。创建新的sudog与接收数据的g绑定加入到recvq的末尾,通过gopark挂起等待其它goroutine向chan中发送数据

complie

在select中使用"ch<-i"或者"<-ch"会被编译成if-else语句

// compiler implements

	select {
	case c <- v:
		... foo
	default:
		... bar
	}

// as

	if selectnbsend(c, v) {
		... foo
	} else {
		... bar
	}
// compiler implements

	select {
	case v, ok = <-c:
		... foo
	default:
		... bar
	}

// as
	if selected, ok = selectnbrecv(&v, c); selected {
		... foo
	} else {
		... bar
	}

closechan

func closechan(c *hchan) {
	//关闭nil chan会报错
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	//上锁
	lock(&c.lock)
	//chan 已经被关闭过
	if c.closed != 0 {
	//释放锁,报panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}
	//将close字段标识为1
	c.closed = 1
	//
	var glist gList

	// release all readers
	//通知所有的reader/reciver
	for {
	//从recvq中获取等待读取的sudog
		sg := c.recvq.dequeue()
		if sg == nil {
		//遍历完成
			break
		}
		if sg.elem != nil {
		//清空sg.elem指向的内存
			typedmemclr(c.elemtype, sg.elem)
			//将指针悬空,防止成为野指针
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		//获取与sudog绑定的g
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		//因为关闭被唤醒设置为false
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		//将gp加入到glist中
		glist.push(gp)
	}

	// release all writers (they will panic)
	//释放所有的sender,并报panic
	//因为向closed chan发送数据会产生panic
	for {
	//获取sendq中的sender
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		//获取sudog中的g
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		//因为close而被唤醒,设置为false
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		//将其加入到glist中
		glist.push(gp)
	}
	//唤醒所有的sender和reciver才可以解锁
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		//将gp由_Gwaiting变为_Grunnable
		//尝试加入到本地p的runnext,
		//本地p的runq以及全局的runq
		goready(gp, 3)
	}
}
  • 通道为nil,报panic: close of nil channel
  • 通道已经关闭(通过closed标识判断),报panic: close of closed channel
  • 唤醒recvq中的所有recver,并将唤醒的g加入到glist
  • 唤醒sendq中的所有sender并报panic: send on closed channel。并将唤醒的g加入到glist

网站公告

今日签到

点亮在社区的每一天
去签到