聊聊Go语言中的networkpoller工作机制

发布于:2024-05-22 ⋅ 阅读:(133) ⋅ 点赞:(0)

写在文章开头

go语言通过goroutine-per-connection的设计思想实现了高性能网络并发模型,本文以Linux服务器的角度详解go语言中的Networkpoller对于Linux层面的网络socket的封装。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

详解Go语言中的networkpoller工作机制

go语言网络程序基础示例

为保证后续源码解读的连贯性,笔者给出一段go语言中网络程序的编写基础示例:

func main() {
	//创建socket,监听8080端口
	listen, err := net.Listen("tcp", "localhost:8080")
	if err != nil {
		log.Fatal(err)
	}
	for {
		//基于epoll监听连接
		conn, err := listen.Accept()
		if err != nil {
			log.Fatal(err)
		}
		//收到连接后对该socket写入hello
		go func() {
			//读取8字节数据
			var buf [8]byte
			n, _ := conn.Read(buf[0:])
			fmt.Println("read data", string(buf[0:n]))

			//基于epoll进行写操作
			conn.Write([]byte("hello"))
			//关闭连接
			conn.Close()
		}()

	}

}

对创建的socket封装及管理

我们调用net包下的Listen函数,其底层本质上完成了如下几件事:

  1. 创建监听socket
  2. 将监听socket的感兴趣的读写事件到epoll网络轮询器上。

这其中go语言对于socket做了抽象的封装,我们完成socket创建之后,它会将基于这个socket的文件描述符,将其封装为pollDesc并存到一个列表中,后续一旦pollDesc对应的socket有就绪事件时就会定位到这个pollDesc的协程,让其处理当前socket的网络IO事件:

在这里插入图片描述

这里我们以go语言对于Linuxsocket的封装为里给出fd_poll_runtime.go下对于socket的封装操作逻辑,其逻辑和我上文所说差不多,调用runtime_pollServerInit进行netpoll模型初始化,然后通过runtime_pollOpensocket的文件描述符抽象成pollDesc并存到pollcache列表中:

func (pd *pollDesc) init(fd *FD) error {
	//初始化当前协程的Networkpoller
	serverInit.Do(runtime_pollServerInit)
	//基于socket的文件描述符fd将其封装成pollDesc存入pollCache链表中统一管理
	ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	if errno != 0 {
		return errnoErr(syscall.Errno(errno))
	}
	pd.runtimeCtx = ctx
	return nil
}

上文runtime_pollServerInit只是抽象定义,我们可以通过全局搜索定位到该定义在Linux上编译链接后的方法即netpoll.gopoll_runtime_pollServerInit可以看到其逻辑比较简单,通过原子类上锁后对netpoll进行初始化:

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	//原子类上锁
	if netpollInited.Load() == 0 {
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited.Load() == 0 {
			//初始化netpoll
			netpollinit()
			netpollInited.Store(1)
		}
		//解锁
		unlock(&netpollInitLock)
	}
}

这里我们简单介绍一下pollDesc,它本质就是对于socket的封装,这其中有几个比较重要的关键字wgrg,如果当前socket获取读时间为空时,这个socket对应的协程地址就会被记录到rg 中,后续当前socket有就绪的读事件就可以通过pollDescrg定位到对应协程将其放到就绪列表等待被处理,这些操作完成后将标识为pdReady,后续主协程处理看到这个标识就会定位到对应的socket处理读事件:

在这里插入图片描述

基于上述为pollDesc的描述我们可以在netpoll.go定位到poll_runtime_pollOpen方法看到pollDesc的初始化逻辑,可以看到它基于上文传入的socket文件描述符fd将其封装为pollDesc并存入pollcache链表中,而rgwg都默认为nil状态:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	//从创建一个pollDesc并分配给当前socket
	pd := pollcache.alloc()
	lock(&pd.lock)
	wg := pd.wg.Load()
	
	//......
	
	rg := pd.rg.Load()

	
	//......
	
	//基于socket描述符fd
	pd.fd = fd

	
	//......
	//读事件标识事件初始化为空
	pd.rg.Store(pdNil)
	
	//......
	//写事件标识设置为空
	pd.wg.Store(pdNil)
	//......
	unlock(&pd.lock)

	//......
	return pd, 0
}

处理未就绪读写socket

当我们和客户端建立连接之后,我们的客户端协程就会调用连接对象的Read方法到内核中查看是否获取就绪的数据,如果没数据就会返回EAGAIN代表当前socket没有就绪数据可读,这个socket对应的pollDesc的rg(读状态标识)或者wg(写状态标识)设置为pdWait状态,再将该协程挂起,等待事件就绪被唤醒,这一点写未就绪也是一样的:

在这里插入图片描述

我们以的Read方法为例,它位于net.go下,查看源码可知它本质就是拿到当前连接的文件描述符fd调用其read方法获取就绪数据:

func (c *conn) Read(b []byte) (int, error) {
	//......
	//查看是否有就绪数据,返回数据长度
	n, err := c.fd.Read(b)
	//......
	return n, err
}

步入其底层实现可以看到系统级调用发现没有就绪数据err就会返回EAGAIN ,此时就会调用waitRead将当前socket设置为等待状态:

func (fd *FD) Read(p []byte) (int, error) {
	//......
	for {
		//查看当前fd对应的socket是否有就绪数据
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			//如果err返回EAGAIN 则说明没有就绪数据,则调用waitRead将其挂起
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

最终这段逻辑就会走到编译后链接到runtime_pollWait的方法即位于netpoll.go下的poll_runtime_pollWait,它会将当前socket的读写标识先清空即设置为pdNil然后再改为pdWait,然后将当前socket对应的协程用gopark方法挂起,等待有就绪事件时唤醒:

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	//......
	//调用netpollblock将socket状态设置为阻塞态,并将对应协程挂起
	for !netpollblock(pd, int32(mode), false) {
			//......
	}
	return pollNoError
}



func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	//查看gpp是读还是写,从而获取对应的rg或者wg
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	// 通过原子交换的方式修改当前socket对应的rg或者wg为等待状态pdWait
	for {
		// Consume notification if already ready.
		if gpp.CompareAndSwap(pdReady, pdNil) {
			return true
		}
		if gpp.CompareAndSwap(pdNil, pdWait) {
			break
		}

		//......
	}

	//将当前协程挂起
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	//......
	return old == pdReady
}

唤醒就绪socket

一旦这些事件就绪,go语言主协程的调用netpoll就会得到这些就绪事件,此时我们就可以通过定位到socketpollDesc,将其rg(读标识)或者wg(写标识)设置的pdReady意为有就绪事件,然后将这个socket对应的协程放入toRun即待运行列表,等待被调度处理事件:

在这里插入图片描述

这段描述我们可以在netpoll.go的netpoll得以印证,可以看到它调用epoll_wait得到就绪事件后,就会基于这个事件定位到pollDesc调用调用netpollready修改rg或wg设置为就绪态pdReady,并将协程存入就绪队列toRun

func netpoll(delay int64) gList {
	//......
	var events [128]syscall.EpollEvent
retry:
	//调用epoll_wait获取就绪的事件
	n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
	//......
	}
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := events[i]
		if ev.Events == 0 {
			continue
		}

		if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
			//......
			continue
		}

		var mode int32
		//根据事件类型设置对应读写标识rw
		if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			//调用netpollready修改rg或wg设置为就绪态pdReady,并将协程存入就绪队列toRun
			pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
			pd.setEventErr(ev.Events == syscall.EPOLLERR)
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

于是我们就步入netpoll.gonetpollready方法,就可以看到我们上图所说明的状态修改和协程入队的操作,它通过netpollunblockrg或者rg设置为pdReady就绪态并返回协程地址,然后将这个地址对应的协程存入待处理的协程队列toRun

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	//根据读写事件调用netpollunblock修改rg或者wg状态,并返回协程地址
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	//如果rg不为空,说明读事件就绪,将该
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}



func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	for {
		
		//......
		var new uintptr
		if ioready {
			new = pdReady
		}
		//将状态通过原子交换修改为就绪态pdReady
		if gpp.CompareAndSwap(old, new) {
			if old == pdWait {
				old = pdNil
			}
			//返回协程地址
			return (*g)(unsafe.Pointer(old))
		}
	}
}

小结

自此我们完整的将Go语言中的networkpoller的设计和源码都分析完成了,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

Linux中的EAGAIN含义:https://www.cnblogs.com/big-xuyue/p/3905597.html


网站公告

今日签到

点亮在社区的每一天
去签到