Golang | 每日一练 (5)

发布于:2025-03-15 ⋅ 阅读:(23) ⋅ 点赞:(0)

💢欢迎来到张胤尘的技术站
💥技术如江河,汇聚众志成。代码似星辰,照亮行征程。开源精神长,传承永不忘。携手共前行,未来更辉煌💥

Golang | 每日一练 (5)

题目

简述 golang 中的 GMP 模型。

参考答案

为了可以更加深入的了解 golang 中的 GMP 模型,下面我们先从操作系统的线程开始,一步一步的深入探索。

线程与协程

线程是操作系统能够进行调度的最小单位,是进程中的一个执行流。有了线程为什么会另外引入协程? 下面主要从如下三个方面进行说明:

  • 重量级:创建和销毁线程需要操作系统级别的资源分配和回收,开销较大。

  • 调度:线程的调度是由操作系统内核完成,线程切换涉及上下文切换,性能开销较大。

  • 数量限制:由于资源开销大(一个系统线程的内存占用在几MB),一个进程能够创建的线程数量有限(通常在几百到数千级别),一般都会受到操作系统的硬件资源限制。

线程切换

从以上列出的这三点来看,其中我认为最主要的还是线程切换的开销,另外线程切换又离不开线程上下文的切换,那么下面解释一下什么是上下文切换?

上下文切换是指操作系统在切换线程或进程时,保存当前线程或进程的执行状态(上下文),并加载下一个线程或进程的执行状态的过程。上下文切换的目的是让操作系统能够在多个线程或进程之间公平地分配 CPU 时间,从而实现多任务并发执行的效果

这里需要强调一点的是:上面这句话所描述的 “线程或进程” 中的进程,指的是可能会出现切换的两个线程是跨进程的情况。

上下文切换流程主要包括以下几个步骤:

  • 中断触发:上下文切换通常由时钟中断、I/O 中断或系统调用触发。当前线程的时间片用完或有其他原因需要调度器介入时,硬件会发出中断信号。CPU 收到中断信号后,会从用户态切换到内核态,执行内核的中断处理程序。
  • 保存当前线程的上下文:操作系统内核的调度器会保存当前正在运行的线程的上下文信息(包括程序计数器、寄存器、栈指针等)。这些信息通常会保存在当前线程的 TCB中,供以后线程恢复执行时使用。
  • 选择新的线程:内核的调度器根据调度算法从就绪队列中选择一个新的线程。选择新的线程时,调度器会考虑线程的优先级、剩余时间片等因素。
  • 加载新线程的上下文:内核从新选中的线程的 TCB 中加载它的上下文信息,将新线程的寄存器、程序计数器、栈指针等恢复到 CPU 寄存器中。如果是从阻塞态恢复的线程,可能需要加载额外的资源(如文件描述符、内存页表等)。
  • 切换到新线程:内核完成上下文恢复后,CPU 将切换到新线程的程序计数器所指向的代码继续执行。如果新线程正在等待系统调用完成,它可能会切换回用户态执行用户代码。

线程切换时,上下文切换的性能开销主要体现在以下几个方面:

  • 寄存器保存与恢复:需要保存当前线程的寄存器状态,并恢复新线程的寄存器状态。
  • 内存管理单元状态切换:如果线程切换涉及不同进程(进程切换),还需要切换页表等内存管理信息。
  • 调度器开销:操作系统调度器需要选择下一个线程或进程,这本身也有一定的计算开销。
协程的引入

与线程切换不同,协程的切换是在用户态完成的轻量级执行单元切换。其核心是 协作式调度,具体过程如下所示:

  • 主动让出控制权:当前协程在合适的位置(例如:I/O操作等)主动调用 yield 或类似机制,将控制权交还给调度器。
  • 选择下一个协程:调度器根据协程的状态(如就绪、阻塞)选择下一个可运行的协程。
  • 恢复协程状态:调度器切换到新协程的栈指针和寄存器状态,新协程从上次暂停的位置继续执行。

另外,相比较于线程切换来说,协程切换的开销极小,主要体现在以下几个方面:

  • 用户态切换:协程切换在用户态完成,无需进入内核态,避免了内核态切换的开销。
  • 轻量级上下文切换:协程的上下文通常只包括栈指针和少量寄存器状态,切换开销远小于线程切换。
  • 无内核调度器开销:协程调度由程序自身完成,无需依赖操作系统调度器。

在这里插入图片描述

Goroutine

Goroutinegolang 中对于协程的一种实现,Goroutine 相比较于传统的多线程模型的优势主要体现在以下几个方面:

  • 轻量级Goroutine 的创建和销毁开销极低,每个 Goroutine 的初始栈大小仅为 2 KB,且栈大小会根据需要动态调整。

  • 高并发能力:由于资源占用少,golang 程序可以轻松创建数百万甚至上千万个 Goroutine,而不会对系统性能产生显著影响。

  • 高效的调度机制Goroutine 的调度由 golang 运行时完成。运行时采用协作式和抢占式相结合的调度方式,能够高效地在多个Goroutine 之间切换。

  • 语法简单:通过 go 关键字即可启动一个 Goroutine,代码简洁易读。

  • 自动内存管理golang 语言内置了垃圾回收机制,能够自动管理 Goroutine 的内存分配和回收,减少了内存泄漏的风险。

  • 多核支持golang 运行时通过 GMP 模型,能够充分利用多核 CPU 的计算能力,将 Goroutine 分配到不同的 CPU 核心上并行执行。

  • 自动负载均衡:运行时会根据 Goroutine 的运行状态和 CPU 负载情况,动态调整 Goroutine 的分配,确保多核 CPU 的高效利用。

下面先给出一个简单示例,展示如何使用 Goroutine 进行并发编程,如下所示:

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, wg *sync.WaitGroup, ch chan int) {
	defer wg.Done()
	for n := range ch {
		fmt.Printf("Worker %d received: %d\n", id, n)
		time.Sleep(time.Second) // 模拟工作负载
	}
}

func main() {
	const numWorkers = 3
	var wg sync.WaitGroup

	// 创建通道
	ch := make(chan int, 10)

	// 启动多个Goroutine
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, &wg, ch)
	}

	// 向通道发送数据
	for j := 0; j < 10; j++ {
		ch <- j
	}

	// 关闭通道
	close(ch)

	// 等待所有Goroutine完成
	wg.Wait()
}
GMP 模型

golangGoroutine 的调度基于 GMP 模型,即 GoroutineG)、MachineM)和 ProcessorP)的组合。

下面先给出 GMP 模型全景图,如下图所示:

在这里插入图片描述

以上图片只是示例,并非实际运行图,请注意甄别。

G

Goroutine(以下全部简称 G),golang 语言中的轻量级协程,代表一个独立的执行单元。

源码位置:src/runtime/runtime2.go

type g struct {
	//  栈相关字段
	stack       stack 	// stack.lo 栈底;stack.hi 栈顶
	stackguard0 uintptr
	stackguard1 uintptr
	
    // 内层的 panic 信息,如果 G 正在执行 panic 处理,则此字段有效
	_panic    *_panic
    // 内层的 defer 信息,当发生 panic 或函数返回时,会依次执行 defer
	_defer    *_defer
	m         *m	// 当前绑定的 M
	sched     gobuf	// 保存 G 的调度信息,包括栈指针、程序计数器等
    // 用于记录系统调用时的状态,以便在 G 恢复时正确恢复上下文
	syscallsp uintptr
	syscallpc uintptr
	syscallbp uintptr
	stktopsp  uintptr
	param        unsafe.Pointer	// 通用指针字段,用于传递参数
	atomicstatus atomic.Uint32	// G 的状态
	stackLock    uint32
	goid         uint64	// G 的ID
	schedlink    guintptr	// 调度链表链接
	waitsince    int64	// 阻塞的时间戳 
	waitreason   waitReason	// 阻塞原因
	preempt       bool	// 关于抢占的信息
	preemptStop   bool 
	preemptShrink bool 
	// ...
	parkingOnChan atomic.Bool	// 是否正在等待通道操作
	// ...
	parentGoid    uint64	// 创建当前 G 的父 GID
	gopc          uintptr   // 创建当前 G 的调用点    
	ancestors     *[]ancestorInfo 
	startpc       uintptr   // G 的起始函数     
	racectx       uintptr
	waiting       *sudog    // G 正在等待的通道操作 
	// ...
	selectDone    atomic.Uint32  // 是否参与了 select 操作
	// ...
}

以上源代码只是核心字段截取,请注意甄别。

G 的状态

源码位置:src/runtime/runtime2.go

// defined constants
const (
	// G status
	_Gidle = iota // 0	空闲状态,尚未被调度
	_Grunnable // 1	可运行状态,等待被调度到某个M上执行
	_Grunning // 2	正在运行中
	_Gsyscall // 3	系统调用中,当前被阻塞在某个系统调用操作上
	_Gwaiting // 4	等待状态,例如等待通道操作完成
	_Gmoribund_unused // 5	未使用的状态,保留
	_Gdead // 6	结束运行,处于死亡状态
	_Genqueue_unused // 7	未使用的状态,保留
	_Gcopystack // 8	执行栈复制操作(例如栈增长时)
	_Gpreempted // 9	被抢占,需要暂停执行
	
    // 扫描状态是垃圾回收期间使用的状态
    // 这些状态在基础状态的基础上加上了 _Gscan 标志,用于标记 G 在垃圾回收阶段的行为
    _Gscan          = 0x1000	// 基础扫描状态标志,用于垃圾回收阶段
	_Gscanrunnable  = _Gscan + _Grunnable  // 0x1001	可运行状态,且正在被扫描
	_Gscanrunning   = _Gscan + _Grunning   // 0x1002	运行中,且正在被扫描
	_Gscansyscall   = _Gscan + _Gsyscall   // 0x1003	系统调用中,且正在被扫描
	_Gscanwaiting   = _Gscan + _Gwaiting   // 0x1004	等待状态,且正在被扫描
	_Gscanpreempted = _Gscan + _Gpreempted // 0x1009	被抢占,且正在被扫描
)

G 状态变化流程图,如下所示:

在这里插入图片描述

M

代表操作系统线程,是实际执行 G 的实体。

源码位置:src/runtime/runtime2.go

type m struct {
	g0      *g	// 每个 M 的初始 G0,用于执行系统调用和调度器逻辑
	morebuf gobuf  // 用于保存 G 的调度信息
	divmod  uint32 
	_       uint32 
	procid          uint64      // 操作系统线程ID      
	gsignal         *g          // 用于信号处理的 G
	goSigStack      gsignalStack      // golang 的信号处理栈
	sigmask         sigset            // 当前线程的信号掩码
	tls             [tlsSlots]uintptr // 线程局部存储(TLS)的指针
	mstartfn        func()
	curg            *g	// 当前正在执行的 G
	caughtsig       guintptr 	// 捕获的信号
	p               puintptr	// 当前绑定的 P
	nextp           puintptr	// 下一次要执行的 P
	oldp            puintptr 
	id              int64	// M 的唯一标识符
	mallocing       int32	// 是否正在执行内存分配
	throwing        throwType	// 是否正在抛出异常
	preemptoff      string 
	locks           int32	// 当前 M 持有的锁数量
	dying           int32	// M 是否正在退出
	profilehz       int32
	spinning        bool 	// M 是否处于自旋状态
	blocked         bool 	// M 是否被阻塞
	newSigstack     bool 
	printlock       int8	// 打印锁,防止并发打印
	// ...     
	freeWait        atomic.Uint32	// 是否在等待释放
	needextram      bool	// 是否需要额外的 M
	g0StackAccurate bool 
	traceback       uint8	// 回溯信息
	ncgocall        uint64	// Cgo 调用次数   
	ncgo            int32	// Cgo 活跃调用数量   
	cgoCallersUse   atomic.Uint32 
	cgoCallers      *cgoCallers   
	park            note	// 用于阻塞和唤醒 M 的同步原语
	alllink         *m	// M 的全局链表
	schedlink       muintptr	// 调度链表
	lockedg         guintptr	// 当前锁定的 G
	// ...     
	syscalltick uint32	// 系统调用的计数器
	freelink    *m	// 释放链表
	// ...     
	signalPending atomic.Uint32	// 是否有挂起的信号
	// ...     
}

以上源代码只是核心字段截取,请注意甄别。

P

golang 运行时中的调度上下文,负责管理 G 的调度,P 的数量由 GOMAXPROCS 设置,默认等于 CPU 核心数。

源码位置:src/runtime/runtime2.go

type p struct {
	id          int32	// P 的唯一标识符
	status      uint32	// P 的状态
	link        puintptr	// P 的链表链接
	schedtick   uint32     // 调度器的 tick 计数器
	syscalltick uint32     // 系统调用的 tick 计数器
	sysmontick  sysmontick // 系统监控的 tick 计数器
	m           muintptr	// 当前绑定的 M
	mcache      *mcache	// 当前 P 的内存分配缓存
	pcache      pageCache	// 页面缓存
	raceprocctx uintptr	// 用于竞态检测的上下文
	deferpool    []*_defer 	// defer 池
	deferpoolbuf [32]*_defer	// defer 池缓冲区
	goidcache    uint64	// GID 缓存
	goidcacheend uint64	// GID 缓存结束位置
	runqhead uint32	// 运行队列头指针
	runqtail uint32 // 运行队列尾指针
	runq     [256]guintptr	// P 的本地运行队列
	runnext guintptr	// 下一个要运行的 G
	gFree struct {
		gList
		n int32
	}
	sudogcache []*sudog	// sudog 缓存
	sudogbuf   [128]*sudog	// sudog 缓冲区
	mspancache struct {	// mspan 缓存
		len int
		buf [128]*mspan
	}
	pinnerCache *pinner	// 内存页缓存
	trace pTraceState
	palloc persistentAlloc 
    // 垃圾回收相关字段
	gcAssistTime         int64 
	gcFractionalMarkTime int64 
	limiterEvent limiterEvent
	gcMarkWorkerMode gcMarkWorkerMode
	gcMarkWorkerStartTime int64
	gcw gcWork
    
	// ...     
	preempt bool	// 是否允许抢占
	gcStopTime int64	// GC 停止时间
}

以上源代码只是核心字段截取,请注意甄别。

P 的状态

源码位置:src/runtime/runtime2.go

const (
	// P status
	_Pidle = iota	// P 处于空闲状态,没有 G 正在执行,也没有 G 等待执行
	_Prunning	// P 正在运行中,即至少有一个 G 正在该 P 上执行
	_Psyscall	// P 处于系统调用状态,即当前 P 上的 G 正在执行系统调用并被阻塞
	_Pgcstop	// P 处于垃圾回收停止状态,即 P 上的 G 被暂停执行以进行垃圾回收
	_Pdead	// P 已经死亡,不再参与调度,也不会再执行任何 G
)

P 状态变化流程图,如下所示:

在这里插入图片描述

程序启动流程
程序入口

首先给出一段测试代码,如下所示:

package main

import "fmt"

func main() {
	fmt.Println("test start main~")
}

将以上代码生成可执行文件后,查看该文件的文件头信息:

$ objdump -f test

test:     文件格式 elf64-x86-64
体系结构:i386:x86-64, 标志 0x00000112:
EXEC_P, HAS_SYMS, D_PAGED
起始地址 0x000000000046ce80

文件头中记录该程序的起始地址是:0x000000000046ce80,根据该地址找到对应标号表中的内容:

$ objdump -t test | grep '46ce80'
000000000046ce80 g     F .text  0000000000000005 _rt0_amd64_linux

然后找到该汇编代码文件,核心的函数调用内容如下所示:

源码位置:src/runtime/asm_amd64.s

TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
# ...
    CALL	runtime·check(SB)

# ...
    CALL	runtime·args(SB)
    CALL	runtime·osinit(SB)
    CALL	runtime·schedinit(SB)

    // create a new goroutine to start program
    MOVQ	$runtime·mainPC(SB), AX		# 获取 runtime.main 的地址
    PUSHQ	AX
    CALL	runtime·newproc(SB)
    POPQ	AX

    // start this M
    CALL	runtime·mstart(SB)

    CALL	runtime·abort(SB)	// mstart should never return
# ...

以上汇编代码只是部分截取,请注意甄别。

  • runtime·check(SB):用于运行时的初始化检查,确保运行时环境是安全的。
  • runtime·args(SB):用于处理命令行参数,并将它们存储到运行时变量中。
  • runtime·osinit(SB):进行操作系统相关的初始化。
  • runtime·schedinit(SB):初始化 golang 的调度器。
  • runtime·newproc(SB):创建一个新的 G 来运行主程序逻辑。
  • runtime·mstart(SB):启动当前的 M,开始执行 G 调度。
  • runtime·abort(SB):如果 runtime.mstart 返回,说明程序出现了异常,则调用 runtime.abort 函数终止程序。
初始化

golang 程序启动时,首先获取当前系统的 CPU 核心数、获取系统支持的大页内存大小、完成其他底层的初始化动作。

源码位置:src/runtime/os_linux.go

func osinit() {
    // 获取当前系统的 CPU 核心数
	ncpu = getproccount()
    // 获取系统支持的大页内存大小
	physHugePageSize = getHugePageSize()
    // 完成操作系统的底层初始化
	osArchInit()
}

操作系统底层初始化完成后,执行 runtime·schedinit 函数初始化 golang 的调度器,其中对于调度流程来说比较重要的是:

  • mcommoninit 函数会初始化 M0 的一些属性,并将 M0 放入全局链表 allm 中;
  • 根据 GOMAXPROCS 创建和初始化 P 列表。GOMAXPROCS 决定了 P 的数量,通常等于系统的 CPU 核心数。

源码位置:src/runtime/proc.go

func schedinit() {
    // 初始化一系列锁
	// ...

    // 获取当前的 G
    gp := getg()

    // ...
	
    // 设置 M 数量的上限
	sched.maxmcount = 10000

	// ...
	mcommoninit(gp.m, -1)	// M 初始化函数
	// ...
	
    // 设置调度器的处理器数量
    // 加锁
	lock(&sched.lock)
	sched.lastpoll.Store(nanotime())
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
    // 调用 procresize 设置调度器的处理器数量
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
    
     // 解锁
	unlock(&sched.lock)

	// ...
}

以上代码只是部分截取,请注意甄别。

另外,在 schedinit 函数中的 procresize 函数主要用于动态调整运行时的 P 数量。它会根据需要创建新的 P 或销毁多余的 P,并确保调度器的运行状态与新的 P 数量一致。

源码位置:src/runtime/proc.go

// nprocs 是处理器数量
func procresize(nprocs int32) *p {
	// ...

	// 初始化新的 P
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
        // 初始化每个新的 P
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }

	// ...
	
    // 构建空闲 P 链表,将可运行的 P 加入队列
    // 遍历所有处理器,将有本地工作(run queue 不为空)的 P 加入可运行队列
	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		pp := allp[i]
		if gp.m.p.ptr() == pp {
			continue
		}
		pp.status = _Pidle
		if runqempty(pp) {
            // 加入空闲链表
			pidleput(pp, now)
		} else {
            // 将有任务的 P 链接起来
			pp.m.set(mget())
			pp.link.set(runnablePs)
			runnablePs = pp
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs
    // 使用原子操作更新全局变量 gomaxprocs,确保线程安全
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	if old != nprocs {
        // 如果实际改变了处理器数量,通知垃圾回收器的 CPU 限制器,更新其容量
		gcCPULimiter.resetCapacity(now, nprocs)
	}
	return runnablePs
}

以上代码只是部分截取,请注意甄别。

创建 G

newproc 函数专门用于创建一个新的 G 并将其放入运行队列中。此时会将 runtime.main 函数的 G 加入到 P 的本地运行队列中。

源码位置:src/runtime/proc.go

// fn 是一个指向函数值的指针,表示 G 将要运行的函数
func newproc(fn *funcval) {
	gp := getg()	// 获取当前 G 的指针
	pc := getcallerpc()	// 调用者的程序计数器
	systemstack(func() {	// 切换到系统栈(G0 的栈)上执行闭包中的代码
        // 调用 newproc1 函数,创建一个新的 G
        // 但是需要注意,这个并不是一个系统调用
		newg := newproc1(fn, gp, pc, false, waitReasonZero)
		
        // 获取当前 M 绑定的 P 的指针
		pp := getg().m.p.ptr()
        
        // 将新创建的 G 放入 P 的本地运行队列,并需要立即调度
		runqput(pp, newg, true)
	
       	// 如果主函数已经启动(mainStarted 为 true),唤醒一个空闲的 P
		if mainStarted {
			wakep()
		}
	})
}

以上代码只是部分截取,请注意甄别。

其中,newproc1golang 运行时中用于创建新 G 的核心函数。它负责初始化 G 的状态,并将其放入调度队列中。

源码位置:src/runtime/proc.go

func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreason waitReason) *g {
	// 检查传入的函数指针是否为 nil。如果是 nil,则触发运行时错误
    if fn == nil {
		fatal("go of nil func value")
	}
	
    // 调用 acquirem 获取当前的 M
	mp := acquirem()
    // 获取当前 M 绑定的 P 的指针
	pp := mp.p.ptr()
	newg := gfget(pp)	// 尝试从 P 的空闲 G 队列中获取一个 G
    // 如果队列为空,则调用 malg 分配一个新的 G
	if newg == nil {
        // 如果没有可用的 G,调度器会分配一个新的 G,并为其分配初始栈空间
		newg = malg(stackMin)	
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) 
	}
    // 检查新 G 是否分配了栈空间。如果没有分配,则抛出错误
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}
	
    // 检查新 G 的状态是否为 _Gdead。如果不是,则抛出错误
	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}
	// 计算 G 的栈空间大小
	totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize)
    // 对齐栈空间
	totalSize = alignUp(totalSize, sys.StackAlign)
	sp := newg.stack.hi - totalSize
    // 根据架构,初始化 G 的栈帧
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
	}
	if GOARCH == "arm64" {
		// caller's FP
		*(*uintptr)(unsafe.Pointer(sp - goarch.PtrSize)) = 0
	}

	// ...
    
    // 释放当前的 M
	releasem(mp)

    // 返回新创建的 G
	return newg
}

以上代码只是部分截取,请注意甄别。

启动调度

newproc 执行完毕后,执行 mstart 启动 M,开始执行 G 调度。

源码位置:src/runtime/asm_amd64.s

TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME|NOFRAME,$0
	CALL	runtime·mstart0(SB)
	RET // not reached

mstart 汇编代码中调用了 mstart0 函数。

源码位置:src/runtime/proc.go

func mstart0() {
    // 获取当前 G
	gp := getg()

	// ...
    
    // 调用 mstart1,执行 M 的初始化逻辑
	mstart1()

	// ...
}

以上代码只是部分截取,请注意甄别。

mstart1mstart0 调用后执行,启动调度器。

源码位置:src/runtime/proc.go

func mstart1() {
    // 获取当前 M 正在执行的 G
	gp := getg()

	if gp != gp.m.g0 {
        // 确保当前 G 是 M 的初始 G0,如果不是则会抛出异常
        // mstart1 只能由 G0 调用
		throw("bad runtime·mstart")
	}
	
    // 这些设置使得 goexit0 或 mcall 可以通过 gogo 恢复到 mstart0 的调用点,并正确退出线程
	gp.sched.g = guintptr(unsafe.Pointer(gp))	// 设置为当前 G
	gp.sched.pc = getcallerpc()	// 获取调用者的程序计数器
	gp.sched.sp = getcallersp()	// 获取调用者的栈指针
	
    // 初始化与架构相关的运行时状态
	asminit()
    // 初始化 M 的状态
	minit()
	
    // 如果当前 M 是 M0,调用 mstartm0 安装信号处理器
	if gp.m == &m0 {
		mstartm0()
	}
		
    // 允许用户在主线程启动时执行自定义逻辑(例如,初始化 C 代码)
	if fn := gp.m.mstartfn; fn != nil {
		// 线程有一个用户定义的启动函数
        fn()
	}
	
    // 如果当前 M 不是 M0,尝试获取一个 P
	if gp.m != &m0 {
		acquirep(gp.m.nextp.ptr())
		gp.m.nextp = 0
	}
    
    // 调用 schedule 启动调度器,开始执行 G
	schedule()
}

以上代码只是部分截取,请注意甄别。

schedule 负责执行一个调度周期:找到一个可运行的 G 并执行它。这个函数是调度器的主循环,它会不断运行,直到程序退出。

源码位置:src/runtime/proc.go

func schedule() {
    // 获取当前 G 的 M
	mp := getg().m
	
    // 如果当前 M 持有锁,抛出异常
	if mp.locks != 0 {
		throw("schedule: holding locks")
	}
	
    // 如果 M 绑定到一个特定的 G
	if mp.lockedg != 0 {
        // 调用 stoplockedm 停止当前 M ,并执行绑定的 G
		stoplockedm()
        // 执行
		execute(mp.lockedg.ptr(), false)
	}
	
    // 如果当前 M 正在执行 cgo 调用,抛出异常
	if mp.incgo {
		throw("schedule: in cgo")
	}

top:
    // 获取当前 M 的 P
	pp := mp.p.ptr()
	pp.preempt = false
	
    // 如果正在自旋,但本地队列中有可运行的 G,抛出异常
	if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}
	
    // 查找可运行的 G
	gp, inheritTime, tryWakeP := findRunnable()

	// ...
	
    // 如果 M 正在自旋,调用 resetspinning 重置自旋状态
	if mp.spinning {
		resetspinning()
	}

	// ...
	
    // 如果需要唤醒一个 P
	if tryWakeP {
		wakep()
    }
    
    // 如果 G 绑定到一个特定线程,重新查找
	if gp.lockedm != 0 {
		startlockedm(gp)
		goto top
	}
	
    // 调用 execute 执行找到的 G
	execute(gp, inheritTime)
}

以上代码只是部分截取,请注意甄别。

execute 用于启动一个 G 的执行。它的作用是将一个可运行的 G 设置为运行状态,并开始执行其代码。

注意,当首次程序执行到这里时,取出 G 就是 runtime.main,也就是说接下来执行的是 runtime.main 函数。

源码位置:src/runtime/proc.go

func execute(gp *g, inheritTime bool) {
    // 获取当前 G 的 M
	mp := getg().m

	// ...
	
    // 将当前 M 的当前 G 设置为 gp
	mp.curg = gp
    // 将 G 的 M 指针 gp.m 设置为当前线程 mp
	gp.m = mp
    // 使用原子操作将 G 的状态从 _Grunnable 更新为 _Grunning
	casgstatus(gp, _Grunnable, _Grunning)
    // 表示 G 不再等待
	gp.waitsince = 0
    // 避免在刚启动时被抢占
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + stackGuard
    // 如果不继承时间片,增加当前 P 的时间片计数器
    // 从其他 P 的本地队列中获取到的 G,需要继承时间片计数器
    // 用于调度器的公平性调度,确保 G 能够获得足够的时间片
	if !inheritTime {
		mp.p.ptr().schedtick++
	}

	// ...
	
    // 切换到 G 的上下文并执行
    // gogo 是一个底层函数,用于在 G 之间切换栈和程序计数器
	gogo(&gp.sched)
}

以上代码只是部分截取,请注意甄别。

运行到这里时 execute 会调用 gogo,它的作用是从 gobuf 结构中恢复 G 的状态,并跳转到目标 G 的执行点。

源码位置:src/runtime/asm_amd64.s

# func gogo(buf *gobuf)
# restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
	# 从函数参数中获取 gobuf 的地址,并保存到寄存器 BX 中
	MOVQ	buf+0(FP), BX		# gobuf
	# 从 gobuf 中获取 G 的指针(gobuf_g),并保存到寄存器 DX 中
	MOVQ	gobuf_g(BX), DX
	# 检查 G 是否为 nil,确保安全性
	MOVQ	0(DX), CX		# make sure g != nil
	# 跳转到 gogo<>,执行实际的上下文切换逻辑
	JMP	gogo<>(SB)

TEXT gogo<>(SB), NOSPLIT, $0
	get_tls(CX)	# 获取当前线程的线程局部存储
	MOVQ	DX, g(CX) # 将目标 G 的指针保存到 TLS 中的 g 指针
	MOVQ	DX, R14		# 将目标 G 的指针保存到寄存器 R14 中,用于后续操作
	
	# 恢复寄存器状态
	MOVQ	gobuf_sp(BX), SP	# 栈指针
	MOVQ	gobuf_ret(BX), AX	# 返回地址
	MOVQ	gobuf_ctxt(BX), DX	# 上下文指针
	MOVQ	gobuf_bp(BX), BP	# 基指针
	
	# 清理 gobuf
	# 将 gobuf 中的寄存器状态清零
	MOVQ	$0, gobuf_sp(BX)	# clear to help garbage collector
	MOVQ	$0, gobuf_ret(BX)
	MOVQ	$0, gobuf_ctxt(BX)
	MOVQ	$0, gobuf_bp(BX)
	
	# 跳转到目标 G
	MOVQ	gobuf_pc(BX), BX	# 从 gobuf 中恢复目标 G 的程序计数器
	JMP	BX	# 跳转到目标地址,开始执行目标 G
runtime.main

源码位置:src/runtime/proc.go

func main() {
  	// ...
   
	if goarch.PtrSize == 8 {	
		maxstacksize = 1000000000	// 64位系统:1GB
	} else {
		maxstacksize = 250000000	// 32位系统:256MB
	}

	// 设置栈大小的上限,避免分配过大的栈导致崩溃
	maxstackceiling = 2 * maxstacksize
    
    // 可以启动 M 来运行 G
	mainStarted = true

    // ...
    
	// 启动系统监控线程 sysmon
	if haveSysmon {
		systemstack(func() {
			newm(sysmon, nil, -1)
		})
	}

	// 将当前 G 锁定到内核线程 M0,确保某些需要在主线程执行的调用能够正常工作
	lockOSThread()
	
    // 确保 runtime.main 是在 M0 上执行
	if mp != &m0 {
		throw("runtime.main not on m0")
	}

	// ...
	
    // 调用用户定义的 main.main 函数,开始执行用户代码
	fn := main_main
	fn()
	
    // ...

	// 如果程序在退出时有正在运行的 panic 或 defer 函数,等待它们完成
	if runningPanicDefers.Load() != 0 {
		// Running deferred functions should not take long.
		for c := 0; c < 1000; c++ {
			if runningPanicDefers.Load() == 0 {
				break
			}
			Gosched()
		}
	}
	if panicking.Load() != 0 {
		gopark(nil, nil, waitReasonPanicWait, traceBlockForever, 1)
	}
    
    // 运行退出钩子
	runExitHooks(0)
	
    // 如果程序正常结束,调用 exit(0) 退出程序
	exit(0)
	for {
		var x *int32
		*x = 0
	}
}

以上代码只是部分截取,请注意甄别。

到这里时,就已经开始执行业务代码了,那么接下来就是从业务执行的角度来分析 G 的调度流程。


业务执行流程
创建 G

给出一个创建 G 的代码,如下所示:

package main

func printVal(val int) int {
	return 10 + val
}

func main() {
	go printVal(1)
}

将以上代码编译成汇编代码,如下所示:

# ...
0x000e 00014       LEAQ    main.main.gowrap1·f(SB), AX
0x0015 00021       PCDATA  $1, $0
0x0015 00021       CALL    runtime.newproc(SB)
0x001a 00026       ADDQ    $8, SP
0x001e 00030       POPQ    BP
0x001f 00031       NOP
0x0020 00032       RET
# ...

以上汇编代码只是部分截取,请注意甄别。

上述代码中使用 runtime.newproc 来创建一个 G,那么接下来根据之前分析结果,在 newproc 函数中会创建,并且将该 G 放入 P 的本地运行队列,需要立即调度。

源码位置:src/runtime/proc.go

func runqput(pp *p, gp *g, next bool) {
    // 优先将 G 放入 runnext 插槽
	if !haveSysmon && next {
		next = false
	}
	if randomizeScheduler && next && randn(2) == 0 {
		next = false
	}

	if next {
	retryNext:
        // 如果 runnext 插槽已经被占用,则将旧的 runnext G 放入普通队列
		oldnext := pp.runnext
		if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
    // 队列头
	h := atomic.LoadAcq(&pp.runqhead)
    // 队列尾
	t := pp.runqtail
    // 检查本地队列是否已满,大小是256个
	if t-h < uint32(len(pp.runq)) {
        // 如果队列未满,将 G 放入队列尾部
		pp.runq[t%uint32(len(pp.runq))].set(gp)
		atomic.StoreRel(&pp.runqtail, t+1)
		return
	}
    // 如果队列已满,调用 runqputslow 将 G 放入全局队列
	if runqputslow(pp, gp, h, t) {
		return
	}
    
    // 如果放入操作失败,则重新尝试
	goto retry
}

如果当前 P 的本地队列已满会调用 runqputslow 函数,将 G 放入全局队列。

源码位置:src/runtime/proc.go

func runqputslow(pp *p, gp *g, h, t uint32) bool {
    // 定义一个临时数组 batch,大小是本地队列长度的一半加一
	var batch [len(pp.runq)/2 + 1]*g

	// First, grab a batch from local queue.
    // 计算本地队列中可用的 G 数量
	n := t - h
	n = n / 2
	if n != uint32(len(pp.runq)/2) {
        // 如果本地队列未满(即实际数量不等于队列长度的一半),抛出异常
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
        // 从本地队列中取出一半的 G,放入 batch 数组
		batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
	}
    // 使用原子操作,更新本地队列的头指针,表示这些 G 已被取出
	if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
        // 如果操作失败,返回 false
		return false
	}
    // 将传入的 G 添加到批量的末尾
	batch[n] = gp
	
    // 如果启用了调度器随机化,对批量中的 G 进行随机打乱
	if randomizeScheduler {
		for i := uint32(1); i <= n; i++ {
            // 使用 cheaprandn 生成随机索引并交换元素,避免调度的可预测性
			j := cheaprandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

	// Link the goroutines.
   	// 将批量中的 G 链接起来,形成一个双向链表
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}
    // 创建一个 gQueue 对象,表示要放入全局队列的 G 批量
	var q gQueue
    // 设置队列的头指针和尾指针
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// Now put the batch on global queue.
    // 获取全局调度器锁 sched.lock,确保对全局队列的操作是线程安全的
	lock(&sched.lock)
    // 调用 globrunqputbatch 将批量 G 放入全局队列
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
    
    // 如果批量成功放入全局队列,返回 true
	return true
}

当创建好 G 之后,会调用 wakep 函数唤醒空闲的 P。该函数的主要目的是确保有足够的 M 在运行,以处理当前的 G 负载。

源码位置:src/runtime/proc.go

func wakep() {
    // 如果当前存在有自旋 M 或者其他 M 已经尝试过唤醒,直接返回
	if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
		return
	}
	
    // 获取当前 M
	mp := acquirem()

	var pp *p
    // 加锁
	lock(&sched.lock)
    // 尝试获取一个空闲的 P
	pp, _ = pidlegetSpinning(0)
	if pp == nil {
        // 如果没有找到空闲的处理器
		if sched.nmspinning.Add(-1) < 0 {
			throw("wakep: negative nmspinning")
		}
        // 解锁并释放当前 M ,直接返回
		unlock(&sched.lock)
		releasem(mp)
		return
	}
	
    // 解锁
	unlock(&sched.lock)
	
    // 获取到了空闲 P,则调用 startm 启动一个 M,并将其绑定到获取到的处理器
	startm(pp, true, false)
	
    // 释放当前 M
	releasem(mp)
}

wakep 函数中执行了 startm 函数,该函数主要对找到的空闲 P,获取一个 M 并进行绑定。

源码位置:src/runtime/proc.go

func startm(pp *p, spinning, lockheld bool) {
	// ...
    
    // 从 M 的空闲列表中获取
	nmp := mget()
	if nmp == nil {
		// ...
        
        // 创建一个新的 M
		newm(fn, pp, id)

		// ...
		return
	}
	// ...
	nmp.spinning = spinning
	nmp.nextp.set(pp)
    // 唤醒空闲的 M
	notewakeup(&nmp.park)
	releasem(mp)
}

以上代码只是部分截取,请注意甄别。

执行 G

schedule 调度函数中有一个 findRunnable 函数,这个函数专门用于获取待执行的 G

源码位置:src/runtime/proc.go

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    // 获取当前 G 的 M
	mp := getg().m

top:
    // 获取 M 绑定的 P
	pp := mp.p.ptr()
    // 如果调度器正在等待垃圾回收,调用 gcstopm 停止当前 M ,并重新开始查找
	if sched.gcwaiting.Load() {
		gcstopm()
		goto top
	}
	
    // ...
	
    // 从全局队列中获取 G
    // 每间隔 61 次调度,尝试从全局队列中获取一个 G
	if pp.schedtick%61 == 0 && sched.runqsize > 0 {
        // 加锁
		lock(&sched.lock)
        // 从全局队列中获取
		gp := globrunqget(pp, 1)
        // 解锁
		unlock(&sched.lock)
        // 获取 G,直接返回
		if gp != nil {
			return gp, false, false
		}
	}
	
	// ...

    //  从本地队列中取到,直接返回
	if gp, inheritTime := runqget(pp); gp != nil {
		return gp, inheritTime, false
	}
	
    // 本地队列中未取到,从全局队列中取
	if sched.runqsize != 0 {
        // 加锁
		lock(&sched.lock)
         // 从全局队列中获取
		gp := globrunqget(pp, 0)
        // 解锁
		unlock(&sched.lock)
         // 获取 G,直接返回
		if gp != nil {
			return gp, false, false
		}
	}
	
    // 处理网络轮询
	if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
        // 如果网络轮询初始化完成且有等待的 G
		if list, delta := netpoll(0); !list.empty() { // non-blocking
            // 获取 G
			gp := list.pop()
			injectglist(&list)
			netpollAdjustWaiters(delta)
			trace := traceAcquire()
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.ok() {
				trace.GoUnpark(gp, 0)
				traceRelease(trace)
			}
            // 如果找到 G,返回
			return gp, false, false
		}
	}

    // 如果当前 M 正在自旋或者系统中自旋 M 的数量少于 GOMAXPROCS/2,则当前 M 可以进入自旋状态
	if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
		if !mp.spinning {
     		// 当前 M 成为自旋状态
			mp.becomeSpinning()
		}
		// 尝试从其他 P 中“偷取” G
		gp, inheritTime, tnow, w, newWork := stealWork(now)
        // 获取 G,直接返回
		if gp != nil {
			return gp, inheritTime, false
		}
		
        // ...
	}

	// 处理 GC 标记任务
    // ...

	// 处理空闲状态
    // ...
	
    // 释放处理器
	lock(&sched.lock)
	if sched.gcwaiting.Load() || pp.runSafePointFn != 0 {
		unlock(&sched.lock)
		goto top
	}
    // 如果全局队列中有任务,尝试从中获取一个 G
	if sched.runqsize != 0 {
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		return gp, false, false
	}
    // 如果当前 M 不是自旋状态,但调度器需要自旋 M,尝试将当前 M 标记为“自旋”
	if !mp.spinning && sched.needspinning.Load() == 1 {
		mp.becomeSpinning()
		unlock(&sched.lock)
		goto top
	}
	if releasep() != pp {
		throw("findrunnable: wrong p")
	}
    // 释放当前 P,并将其放入空闲队列
	now = pidleput(pp, now)
	unlock(&sched.lock)
    
	wasSpinning := mp.spinning
	if mp.spinning {
        // 如果当前 M 处于自旋状态,则将其标记为非自旋
		mp.spinning = false
		if sched.nmspinning.Add(-1) < 0 {
			throw("findrunnable: negative nmspinning")
		}
		
        // 加锁
        // 重新检查全局队列和本地队列
		lock(&sched.lock)
        // 如果全局队列中有 G,尝试获取一个空闲的 P,并从中获取一个 G
		if sched.runqsize != 0 {
            // 获取 P
			pp, _ := pidlegetSpinning(0)
			if pp != nil {
                // 获取 G
				gp := globrunqget(pp, 0)
				if gp == nil {
					throw("global runq empty with non-zero runqsize")
				}
                // 解锁
				unlock(&sched.lock)
				acquirep(pp)
				mp.becomeSpinning()
                // 返回
				return gp, false, false
			}
		}
        // 如果全局队列中还是没有G,解锁
		unlock(&sched.lock)
        
        // 这里起始有个疑问:为什么这里就先检查的是全局队列?不是先检查所有 P 的本地队列?
        // 比如先遍历所有 P,找到有 G 的 P,然后直接返回 top,开始获取
        // 如果所有的 P 都没有,再从全局队列中再获取 G
		
        // 检查所有 P 的本地队列
        // 如果找到 G,获取对应的处理器
		pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
		if pp != nil {
			acquirep(pp)
            // 当前 M 成为自旋状态
			mp.becomeSpinning()
            // 从顶部开始,重新检查
			goto top
		}
		
        // 检查是否有空闲优先级的 GC G
        
        // 如果找到 G,获取对应的 P
		pp, gp := checkIdleGCNoP()
		if pp != nil {
			acquirep(pp)
            // 恢复 M 的自旋状态
			mp.becomeSpinning()

			pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
			trace := traceAcquire()
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.ok() {
				trace.GoUnpark(gp, 0)
				traceRelease(trace)
			}
             // 并返回找到的 G
			return gp, false, false
		}
        
        // ...
	}

	// ...
	
    // 如果没有找到可运行的 G,停止当前 M,进入休眠状态
    // 1. 调用 mput 将当前 M 放入空闲队列
    // 2. 调用 mPark 将当前 M 放入休眠状态,直到被唤醒
    // 3. 清空 gp.m.nextp,当前 M 不再持有下一个处理器
	stopm()
    
    // 重新开始查找
	goto top
}

以上代码只是部分截取,请注意甄别。

  • 每间隔 61 次调度,尝试从全局队列中获取一个 G;其余时候 M 会从绑定的 P 的本地队列中获取 G 并执行。

  • P 的本地队列为空时,会执行 globrunqget 函数,该函数专门用于从全局队列中获取 G

源码位置:src/runtime/proc.go

func globrunqget(pp *p, max int32) *g {
	assertLockHeld(&sched.lock)
	// 如果全局队列为空,直接返回 nil
	if sched.runqsize == 0 {
		return nil
	}
	
    // 计算要从全局队列中获取的 G 数量
    // 默认情况下,每个 P 可以从全局队列中获取 sched.runqsize / gomaxprocs + 1 个 G,以确保负载均衡
	n := sched.runqsize/gomaxprocs + 1
    // 如果计算的 n 超过了全局队列的大小,则取全局队列的大小
	if n > sched.runqsize {
		n = sched.runqsize
	}
    // 如果设置了 max,则 n 不能超过 max
	if max > 0 && n > max {
		n = max
	}
    // 如果 n 超过了本地队列大小的一半,则取本地队列大小的一半
	if n > int32(len(pp.runq))/2 {
		n = int32(len(pp.runq)) / 2
	}
	
    // 从全局队列大小中减去即将获取的 G 数量
	sched.runqsize -= n
	
    // 从全局队列中弹出一个 G,并将其保存到 gp 中
	gp := sched.runq.pop()
	n--
    // 如果还需要获取更多 G,则继续从全局队列中弹出,并将它们放入目标 P 的本地队列中
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(pp, gp1, false)
	}
    // 返回从全局队列中获取的第一个 G
	return gp
}
  • 如果全局队列不空时,则根据以下公式取出 nG 加入到 P 的本地队列中。

n = m i n ( l e n ( 全局队列 ) / G O M A X P R O C S + 1 , c a p ( 本地队列 ) / 2 ) n = min(len(全局队列) / GOMAXPROCS + 1, cap(本地队列) / 2 ) n=min(len(全局队列)/GOMAXPROCS+1,cap(本地队列)/2)

  • 如果全局队列也为空,则从网络轮询事件中取 G

  • 如果获取到 G 则直接进入调度阶段。

  • 如果还是未获取到 G 并且当前系统中自旋 M 的数量小于 GOMAXPROCS/2,则 M 会进入自旋状态。自旋状态的目的是让 M 快速尝试获取任务,而不是直接进入休眠,从而减少上下文切换的开销。

  • 在自旋状态下,通过 stealWork 调用函数尝试从其他 P 的本地队列中窃取 G 执行。

源码位置:src/runtime/proc.go

func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
    // 获取当前 M 绑定的 P
	pp := getg().m.p.ptr()

	// ...

   	// 尝试最多4次
	const stealTries = 4
	for i := 0; i < stealTries; i++ {
		stealTimersOrRunNextG := i == stealTries-1
		
        // 使用 stealOrder 随机遍历所有 P,以避免总是从相同的处理器中偷取 G
		for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
            // 如果调度器正在等待垃圾回收,表示可能有新的 G
			if sched.gcwaiting.Load() {
				// GC work may be available.
				return nil, false, now, pollUntil, true
			}
	
			// ... 
			
            // 如果目标 P 不是空闲的,尝试从其本地队列中偷取 G
			if !idlepMask.read(enum.position()) {
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}
	
    // 如果没有找到可运行的 G,返回 nil 和其他相关信息
	return nil, false, now, pollUntil, ranTimer
}

以上代码只是部分截取,请注意甄别。

  • 如果从其他 P 的本地队列中窃取到 G,则直接进入调度阶段。
  • 如果还是没有从其他 P 的本地队列中窃取到 G,则首先会将当前 M 重新标记为非自旋 M,减少全局自旋 M 的数量,然后再一次检查全局队列和本地队列是否可以获取到 G
  • 当此时从全局队列、本地队列、空闲优先级的 GCG、网络轮询获取到了 G,则恢复 M 的自旋状态,进入调度阶段。
  • 如果没有找到可运行的 G,调用 stopm 函数将当前 M 放入空闲队列,并进入休眠状态。

源码位置:src/runtime/proc.go

func stopm() {
	gp := getg()

	// ...

	lock(&sched.lock)
	mput(gp.m)
	unlock(&sched.lock)
	mPark()
	acquirep(gp.m.nextp.ptr())
	gp.m.nextp = 0
}

以上代码只是部分截取,请注意甄别。

调度执行
  • 当获取到 G 后,在 execute 函数中执行 gogo 函数切换到该 G 的栈,此时,M 会保存当前 G0 的上下文,并恢复目标 G 的上下文,然后开始执行 G

  • golangsysmon 是一个独立的后台监控线程,它负责监控和管理 G 的执行情况,确保调度的公平性和系统的高效运行。如果某个 G 运行时间超过 10 ms,sysmon 会触发抢占,将 G 重新放入全局队列中。

源码位置:src/runtime/proc.go

func sysmon() {
	// ...

	lasttrace := int64(0)
	idle := 0 // how many cycles in succession we had not wokeup somebody
	delay := uint32(0)

	for {
		if idle == 0 { // start with 20us sleep...
			delay = 20
		} else if idle > 50 { // start doubling the sleep after 1ms...
			delay *= 2
		}
		if delay > 10*1000 { // up to 10ms
			delay = 10 * 1000
		}
		usleep(delay)
        
		// ...

		lock(&sched.sysmonlock)
        
		// ...
        
		// 
		if retake(now) != 0 {
			idle = 0
		} else {
			idle++
		}
		
        // ...
        
		unlock(&sched.sysmonlock)
	}
}

以上代码只是部分截取,请注意甄别。

  • sysmon 函数通过定时调用 retake 函数强制停止长时间运行的 G,以避免某些 G 占用过多的 CPU 时间,影响其他 G 的执行。

源码位置:src/runtime/proc.go

// 定义了 G 在没有被抢占的情况下可以运行的最大时间
const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
	n := 0
	// 加锁
	lock(&allpLock)
	
    // 遍历所有处理器
	for i := 0; i < len(allp); i++ {
        // 获取当前处理器
		pp := allp[i]
		if pp == nil {
			continue
		}
		pd := &pp.sysmontick
        // 获取处理器的状态
		s := pp.status
		sysretake := false
        // 如果处理器正在运行或在系统调用中
		if s == _Prunning || s == _Psyscall {
			t := int64(pp.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
                // 如果 G 运行时间超过 10 毫秒,调用 preemptone 强制抢占
				preemptone(pp)
				sysretake = true
			}
		}
        // 如果处理器阻塞在系统调用中
		if s == _Psyscall {
			t := int64(pp.syscalltick)
			if !sysretake && int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
            
			if runqempty(pp) 
            	&& sched.nmspinning.Load()+sched.npidle.Load() > 0 
            	&& pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			unlock(&allpLock)
			incidlelocked(-1)
			trace := traceAcquire()
            // 将处理器的状态从 _Psyscall 改为 _Pidle
			if atomic.Cas(&pp.status, s, _Pidle) {
				if trace.ok() {
					trace.ProcSteal(pp, false)
					traceRelease(trace)
				}
                // 回收 P 的数量加1
				n++
				pp.syscalltick++
                // 将一个 P 从当前 M 释放出来,并将其放入空闲队列中
				handoffp(pp)
			} else if trace.ok() {
				traceRelease(trace)
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
    
    // 释放锁
	unlock(&allpLock)
 
    // 返回回收的处理器数量
	return uint32(n)
}

以上汇编代码只是部分截取,请注意甄别。

  • 如果 G 遇到阻塞操作(如:I/O操作、系统调用等),则对应的 M 内核线程也会陷入阻塞状态。
  • 如果此时队列中存在未执行的 G,则调度器会将这个阻塞的内核线程 MP 中剔除。
  • 然后从空闲 M 队列中取一个 M,重新绑定 P;如果不存在空闲的 M,则会创建一个新的 M,再重新绑定 P
  • 接着从本地队列中或者全局队列中取未执行的 G 运行。
  • 当陷入阻塞的内核线程 M 恢复后,会尝试恢复执行。此时,M 需要重新获取一个 P 来继续执行 G
  • M 会尝试从空闲 P 队列中获取一个空闲的 P。如果成功获取到 PM 会将该 P 绑定到自己,并继续执行 G;如果当前没有空闲的 PM 会进入休眠状态。
  • 此时,原本在 M 上执行的 G 会被放入全局队列中,等待其他 M 获取空闲的 P 后重新调度。
执行结束
  • 当某一个 G 执行完成时,它会调用 goexit 函数。

源码位置:src/runtime/asm_amd64.s

TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME|NOFRAME,$0-0
	BYTE	$0x90	# NOP
	CALL	runtime·goexit1(SB)	# does not return
	BYTE	$0x90	# NOP
  • goexit 调用 runtime.goexit1,该函数负责清理当前 G 的资源,M 会切换回 G0

源码位置:src/runtime/proc.go

func goexit1() {
	// ...
    // 调用 mcall,将控制权切换到 goexit0 函数
    // 
	mcall(goexit0)
}

以上代码只是部分截取,请注意甄别。

  • goexit0goexit1 的延续,运行在 g0 上,该函数主要做两件事:

    • 调用 gdestroy 函数,销毁当前 G 的资源,包括释放栈空间。

    • 调用 schedule 函数,将控制权交还给调度器,让调度器选择下一个可运行的 G

源码位置:src/runtime/proc.go

// goexit continuation on g0.
func goexit0(gp *g) {
	gdestroy(gp)
	schedule()
}
  • 如果程序中没有可运行的 G,且主 G 执行完毕,程序会退出。

🌺🌺🌺撒花!

如果本文对你有帮助,就点关注或者留个👍
如果您有任何技术问题或者需要更多其他的内容,请随时向我提问。

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到