【Go底层】通道原理

发布于:2024-12-08 ⋅ 阅读:(174) ⋅ 点赞:(0)

1、背景

chan可以说是go中非常好用的并发控制模块,并且是并发安全的,采用了CSP思想(通过通信来共享内存,而不是通过共享内存来通信),chan虽然好用,但是在使用过程中一不小心可能就会使程序进入阻塞状态,排查起来困难,下面我们就根据chan的部分源码来理解初始化chan、从chan读数据、往chan写数据三部分原理。

2、go版本

$ go version
go version go1.21.4 windows/386

3、源码解释

【1】chan的底层结构

chan的底层结构位于:src/runtime/chan.go,其结构为:

type hchan struct {
	qcount   uint           //通道中已经存储了的元素个数
	dataqsiz uint           //通道中就可以存储的最大元素个数
	buf      unsafe.Pointer //存储元素的缓冲区
	elemsize uint16 //元素大小
	closed   uint32 //通道是否关闭
	elemtype *_type //元素类型
	sendx    uint   //即将向通道中写数据的缓冲区位置
	recvx    uint   //即将向通道中读数据的缓冲区位置
	recvq    waitq  //等待中的向通道读取数据的协程队列
	sendq    waitq  //等待中的往通道写数据的协程队列
	lock mutex  	//保证并发安全
}

chan的数据结构其实可以抽象成一个圆环结构如下:
在这里插入图片描述

【2】chan初始化

我们使用make初始化通道的时候,实质上是调用了makechan函数,位于:src/runtime/chan.go中,关键源码如下:

func makechan(t *chantype, size int) *hchan {
	//...

	//计算出通道缓冲区存储元素所需大小
	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
	
	var c *hchan //定义一个未初始化的chan对象
	switch {
	case mem == 0: //无缓冲区通道
		c = (*hchan)(mallocgc(hchanSize, nil, true)) //申请一块hchan结构大小的内存
		c.buf = c.raceaddr()
	case elem.PtrBytes == 0: //有缓冲区通道并且存储的元素不包含指针
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) //申请chan结构大小+元素大小的内存
		c.buf = add(unsafe.Pointer(c), hchanSize) //将申请内存大小后移chan结构大小后的地址赋值给存储元素的buf
	default: //有缓冲区并且元素内包含指针
		c = new(hchan) //申请一块hchan大小的内存
		c.buf = mallocgc(mem, elem, true) //为存放元素的buf单独申请一块空间
	}
	
	//...
	
	return c
}

总结一下初始化chan时,分为三种情况,用图表示如下
在这里插入图片描述

【3】往chan发送数据

往chan写数据实质上是调用了chansend函数,位于:src/runtime/chan.go中,关键源码如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {  //未初始化的通道,会挂起协程
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
		throw("unreachable")
	}

	//...

	if c.closed != 0 { //往关闭的chan里写数据会panic
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	//从等待从chan中读取数据的协程队列中取出第一个协程
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3) //将数据直接发送给读取的协程
		return true
	}

	if c.qcount < c.dataqsiz { //缓冲区元素<缓冲区大小,说明缓冲区还有空间
		qp := chanbuf(c, c.sendx) //得到要写入缓冲区的地址
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep) //将要写入的数据ep复制到缓冲区qp位置
		c.sendx++ //写入缓冲区之后,写缓冲区的位置+1
		if c.sendx == c.dataqsiz { //到缓冲区最后的位置之后从头开始写
			c.sendx = 0
		}
		c.qcount++ //缓冲区数量+1
		unlock(&c.lock)
		return true
	}

	//...
	
	//缓冲区空间不够放到协程队列中等待唤醒
	c.sendq.enqueue(mysg)
	
	//...
}

上述往chan写数据可以分为3个场景:

1、直接将数据发送到有往通道读取数据的协程。
2、缓冲区空间足够就将数据写入缓冲区。
3、缓冲区空间不足就进入写协程队列中等待唤醒。

通过上述代码段能总结写chan时的一些注意事项:

1、往未初始化的chan里写数据会阻塞。
2、往关闭的chan里写数据会panic。
3、往缓冲区写满的chan里写数据会阻塞。

【4】从chan读取数据

从chan读取数据调用的函数是chanrecv,位于:src/runtime/chan.go中,关键源码如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c == nil {  //通道未初始化,当前协程挂起
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
		throw("unreachable")
	}

	//...
	
	if c.closed != 0 { //通道关闭
		if c.qcount == 0 { //缓冲区无数据,不会有任何报错,所有能从关闭的无缓冲数据的通道里一直读取零值
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false //第二个返回值为false,这就是为啥可以检测通道是否关闭的原因
		}
	} else { //通道未关闭,从写chan的协程队列中取出第一个去读取数据
		if sg := c.sendq.dequeue(); sg != nil {
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

	if c.qcount > 0 { //缓冲区有数据
		qp := chanbuf(c, c.recvx) //得到读缓冲区位置地址
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) //将读取到的缓冲区数据复制到ep中
		}
		typedmemclr(c.elemtype, qp) //清空已经被读取的缓冲区数据
		c.recvx++ //读取缓冲区位置+1
		if c.recvx == c.dataqsiz { //到缓冲区最后的位置后从起始位置开始读
			c.recvx = 0
		}
		c.qcount-- //缓冲区数量-1
		unlock(&c.lock)
		return true, true
	}

	//...

	//缓冲区中无数据读取就进入协程队列等待
	c.recvq.enqueue(mysg)
	
	//...
}

上述从chan读取数据也可以分为3个场景:

1、直接从写通道的协程中读取数据。
2、缓冲区有数据就从缓冲区读取数据。
3、缓冲区无数据就进入读协程队列中等待唤醒。

通过上述代码也能总结出读chan时的注意事项:

1、从未初始化的chan中读取数据会阻塞。
2、从已关闭的chan中读取数据,缓冲区有数据就会读取缓冲区数据,无数据就会得到零值。
3、chan关闭时,从chan中读取数据的第2个参数不一定是false,比如缓冲区有数据就为true。
4、缓冲区无数据时,从chan中读取就会阻塞。

4、总结

通过分析写chan、读chan的源码,我们在写代码时能很容易避免协程阻塞,能很清楚的理解未初始化chan和关闭chan对读写操作的影响。


网站公告

今日签到

点亮在社区的每一天
去签到