写在文章开头
go语言
通过goroutine-per-connection
的设计思想实现了高性能网络并发模型,本文以Linux
服务器的角度详解go语言
中的Networkpoller
对于Linux
层面的网络socket
的封装。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解Go语言中的networkpoller工作机制
go语言网络程序基础示例
为保证后续源码解读的连贯性,笔者给出一段go语言中网络程序的编写基础示例:
func main() {
//创建socket,监听8080端口
listen, err := net.Listen("tcp", "localhost:8080")
if err != nil {
log.Fatal(err)
}
for {
//基于epoll监听连接
conn, err := listen.Accept()
if err != nil {
log.Fatal(err)
}
//收到连接后对该socket写入hello
go func() {
//读取8字节数据
var buf [8]byte
n, _ := conn.Read(buf[0:])
fmt.Println("read data", string(buf[0:n]))
//基于epoll进行写操作
conn.Write([]byte("hello"))
//关闭连接
conn.Close()
}()
}
}
对创建的socket封装及管理
我们调用net
包下的Listen
函数,其底层本质上完成了如下几件事:
- 创建监听
socket
。 - 将监听
socket
的感兴趣的读写事件到epoll
网络轮询器上。
这其中go语言
对于socket
做了抽象的封装,我们完成socket
创建之后,它会将基于这个socket
的文件描述符,将其封装为pollDesc
并存到一个列表中,后续一旦pollDesc
对应的socket
有就绪事件时就会定位到这个pollDesc
的协程,让其处理当前socket
的网络IO事件:
这里我们以go语言
对于Linux
中socket
的封装为里给出fd_poll_runtime.go
下对于socket
的封装操作逻辑,其逻辑和我上文所说差不多,调用runtime_pollServerInit
进行netpoll
模型初始化,然后通过runtime_pollOpen
将socket
的文件描述符抽象成pollDesc
并存到pollcache
列表中:
func (pd *pollDesc) init(fd *FD) error {
//初始化当前协程的Networkpoller
serverInit.Do(runtime_pollServerInit)
//基于socket的文件描述符fd将其封装成pollDesc存入pollCache链表中统一管理
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
上文runtime_pollServerInit
只是抽象定义,我们可以通过全局搜索定位到该定义在Linux
上编译链接后的方法即netpoll.go
的poll_runtime_pollServerInit
可以看到其逻辑比较简单,通过原子类上锁后对netpoll进行初始化:
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
//原子类上锁
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
//初始化netpoll
netpollinit()
netpollInited.Store(1)
}
//解锁
unlock(&netpollInitLock)
}
}
这里我们简单介绍一下pollDesc
,它本质就是对于socket
的封装,这其中有几个比较重要的关键字wg
和rg
,如果当前socket
获取读时间为空时,这个socket
对应的协程地址就会被记录到rg
中,后续当前socket
有就绪的读事件就可以通过pollDesc
的rg
定位到对应协程将其放到就绪列表等待被处理,这些操作完成后将标识为pdReady
,后续主协程处理看到这个标识就会定位到对应的socket
处理读事件:
基于上述为pollDesc
的描述我们可以在netpoll.go
定位到poll_runtime_pollOpen
方法看到pollDesc
的初始化逻辑,可以看到它基于上文传入的socket
文件描述符fd
将其封装为pollDesc
并存入pollcache
链表中,而rg
和wg
都默认为nil
状态:
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
//从创建一个pollDesc并分配给当前socket
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
//......
rg := pd.rg.Load()
//......
//基于socket描述符fd
pd.fd = fd
//......
//读事件标识事件初始化为空
pd.rg.Store(pdNil)
//......
//写事件标识设置为空
pd.wg.Store(pdNil)
//......
unlock(&pd.lock)
//......
return pd, 0
}
处理未就绪读写socket
当我们和客户端建立连接之后,我们的客户端协程就会调用连接对象的Read
方法到内核中查看是否获取就绪的数据,如果没数据就会返回EAGAIN
代表当前socket
没有就绪数据可读,这个socket
对应的pollDesc
的rg(读状态标识)或者wg(写状态标识)设置为pdWait
状态,再将该协程挂起,等待事件就绪被唤醒,这一点写未就绪也是一样的:
我们以的Read
方法为例,它位于net.go
下,查看源码可知它本质就是拿到当前连接的文件描述符fd
调用其read
方法获取就绪数据:
func (c *conn) Read(b []byte) (int, error) {
//......
//查看是否有就绪数据,返回数据长度
n, err := c.fd.Read(b)
//......
return n, err
}
步入其底层实现可以看到系统级调用发现没有就绪数据err
就会返回EAGAIN
,此时就会调用waitRead
将当前socket
设置为等待状态:
func (fd *FD) Read(p []byte) (int, error) {
//......
for {
//查看当前fd对应的socket是否有就绪数据
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
//如果err返回EAGAIN 则说明没有就绪数据,则调用waitRead将其挂起
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
最终这段逻辑就会走到编译后链接到runtime_pollWait
的方法即位于netpoll.go
下的poll_runtime_pollWait
,它会将当前socket的读写标识先清空即设置为pdNil
然后再改为pdWait
,然后将当前socket对应的协程用gopark
方法挂起,等待有就绪事件时唤醒:
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
//......
//调用netpollblock将socket状态设置为阻塞态,并将对应协程挂起
for !netpollblock(pd, int32(mode), false) {
//......
}
return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
//查看gpp是读还是写,从而获取对应的rg或者wg
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 通过原子交换的方式修改当前socket对应的rg或者wg为等待状态pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
//......
}
//将当前协程挂起
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
//......
return old == pdReady
}
唤醒就绪socket
一旦这些事件就绪,go语言
主协程的调用netpoll就会得到这些就绪事件,此时我们就可以通过定位到socket
的pollDesc
,将其rg(读标识)或者wg(写标识)
设置的pdReady
意为有就绪事件,然后将这个socket
对应的协程放入toRun
即待运行列表,等待被调度处理事件:
这段描述我们可以在netpoll.go的netpoll得以印证,可以看到它调用epoll_wait
得到就绪事件后,就会基于这个事件定位到pollDesc
调用调用netpollready
修改rg或wg设置为就绪态pdReady
,并将协程存入就绪队列toRun
:
func netpoll(delay int64) gList {
//......
var events [128]syscall.EpollEvent
retry:
//调用epoll_wait获取就绪的事件
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
//......
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
//......
continue
}
var mode int32
//根据事件类型设置对应读写标识rw
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
//调用netpollready修改rg或wg设置为就绪态pdReady,并将协程存入就绪队列toRun
pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
pd.setEventErr(ev.Events == syscall.EPOLLERR)
netpollready(&toRun, pd, mode)
}
}
return toRun
}
于是我们就步入netpoll.go
的netpollready
方法,就可以看到我们上图所说明的状态修改和协程入队的操作,它通过netpollunblock
将rg
或者rg
设置为pdReady
就绪态并返回协程地址,然后将这个地址对应的协程存入待处理的协程队列toRun
:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
//根据读写事件调用netpollunblock修改rg或者wg状态,并返回协程地址
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
//如果rg不为空,说明读事件就绪,将该
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
//......
var new uintptr
if ioready {
new = pdReady
}
//将状态通过原子交换修改为就绪态pdReady
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
old = pdNil
}
//返回协程地址
return (*g)(unsafe.Pointer(old))
}
}
}
小结
自此我们完整的将Go语言中的networkpoller的设计和源码都分析完成了,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
参考
Linux中的EAGAIN含义:https://www.cnblogs.com/big-xuyue/p/3905597.html