15 go语言(golang) - 并发编程goroutine原理及数据安全

发布于:2024-11-27 ⋅ 阅读:(18) ⋅ 点赞:(0)

底层原理

Go 的 goroutine 是一种轻量级的线程实现,允许我们在程序中并发地执行函数。与传统的操作系统线程相比,goroutine 更加高效和易于使用。

轻量级调度

  • 用户态调度:Go 运行时提供了自己的调度器,这意味着 goroutine 的创建和切换是在用户空间完成的,而不需要操作系统内核参与。这使得 goroutine 切换比传统线程更快。

  • M:N 模型:Go 使用 M:N 调度模型,将 M 个 goroutines 映射到 N 个操作系统线程上。这种方式可以有效利用多核 CPU,同时保持每个 OS 线程上的多个并发任务。

栈管理

  • 动态栈大小:goroutines 从一个很小的栈开始(大约几 KB),而不是像传统线程那样分配较大的固定栈(通常为 MB)。当需要更多空间时,goroutine 的栈会自动增长,并在不再需要时收缩。

  • 避免内存浪费:这种动态调整机制避免了大量未使用内存的浪费,使得同时运行大量 goroutines 成为可能。

Goroutine 调度器

  • Goroutines、OS Threads 和 Processors ( P )

    • G 表示 Goroutine。
    • M 表示 Machine,对应实际的 OS Thread。
    • P 表示 Processor,是 Go runtime 中用于执行 Go code 的抽象概念。每个 P 持有一个本地 run queue 来管理待执行的 Gs。
  • 在 Go 程序启动时,会根据可用 CPU 核心数创建对应数量的 P,每个 P 会绑定到一个 M 上来执行 Gs。当某个 G 阻塞或完成后,P 可以将其切换出去并从队列中选择下一个 G 执行。

  • G 是具体要完成工作的单元,M 是实际执行工作的实体,而 P 则是提供环境和条件以便高效调度这些工作的抽象层。

Goroutine ( G )

  • 定义:每个 goroutine 是一个独立执行的函数,类似于轻量级线程。
  • 结构:在实现中,goroutine 是一个包含栈、程序计数器和其他调度信息的数据结构。
  • 栈管理:goroutines 使用动态栈,可以从很小(几 KB)开始,并根据需要增长和收缩。这种灵活性允许同时运行大量 goroutines,而不会浪费内存。

Machine ( M )

  • 定义M 表示 Machine,是与操作系统线程直接对应的实体。
  • 职责
    • 执行分配给它们的 G
    • 管理与操作系统交互,如进行系统调用时阻塞或唤醒 OS 线程。

Processor ( P )

  • 定义P 表示 Processor,是 Go runtime 中用于执行 Go code 的抽象概念。

  • 数量控制

    • 在程序启动时,Go runtime 会创建若干个 P,默认数量等于机器上的 CPU 核心数,但可以通过 runtime.GOMAXPROCS(n) 函数调整。
  • 职责

    • 每个 P 持有一个本地 run queue,用来存储待执行的 Gs(goroutines)。
    • 调度器会从这些队列中选择 G 并将其分配给 M 执行。

调度机制

  1. 工作窃取算法(Work Stealing)

    • 每个 P 都有自己的本地队列,当某个 P 的队列为空时,它可以从其他 P 那里“窃取”一些任务来执行。这种机制提高了负载均衡,有助于充分利用多核处理器资源。
  2. 全局运行队列

    • 除了每个 P 的本地队列外,还有一个全局运行队列。当新建 G 或者当某些情况下无法放入本地队列时,会被放入全局运行队列。空闲 M 可以从这里获取新的任务以保持忙碌状态。
  3. 抢占式调度

    • Goroutines 是抢占式调度,这意味着长时间运行且不释放 CPU 的 Goroutine 可以被暂停,以便让其他 Goroutines 获得执行机会。这避免了一些 Goroutines 独占 CPU 而导致其他任务饥饿的问题。
  4. 阻塞处理

    • 当 G 被阻塞(例如等待 I/O 操作),M 会将该 G 挂起并尝试获取另一个可用 G 来继续工作。如果没有可用 G,则可能会休眠或终止该 M,以节省资源。

调度流程

在这里插入图片描述

  1. 获取 P

    • 当一个线程 M 想要运行时,它必须首先获取一个 P。P 是 CPU 的抽象,限制了并发运行的最大数量(通常等于 GOMAXPROCS)。
  2. 从本地队列获取 G

    • 一旦 M 和 P 关联,M 会尝试从其关联的 P 的本地队列(LRQ)中获取下一个待执行的 G。如果找到了,则直接开始执行。
  3. 全局队列检查

    • 如果本地队列没有可用的 G,M 会查看全局队列(GRQ)。全局队列是所有空闲或等待分配到某个 P 上运行的 goroutines 集合。
    • 从 GRQ 中取出一批 G 放入当前 P 的本地队列,以便后续调度。
  4. 工作窃取机制

    • 如果全局队列也没有可用 G,那么 M 将尝试从其他随机选择的一些活跃且有任务积压在其 LRQ 中的 P 窃取一半数量放到自己的 LRQ。这种机制称为“工作窃取”,用于平衡负载和提高 CPU 利用率。
  5. 执行 Goroutine

    • 一旦成功获得了待运行 Goroutine(无论是通过哪种方式),M 开始实际执行这个 Goroutine。当这个 Goroutine 执行完成后,重复上述过程以寻找下一个可以被调度和运行的新任务。
  6. 持续循环

    • 这种寻找、分配、窃取和执行过程会不断循环进行,以确保所有创建出来但尚未完成工作的 Goroutines 能够尽快得到处理,同时最大化利用系统资源来提高程序性能。

通信与同步

  • Go 提供了 Channel 来实现不同 Goroutines 间的数据传递和同步,这是一种 CSP(Communicating Sequential Processes)模型,实现了安全且高效的数据共享机制。

  • 除此之外,还可以使用 sync 包中的互斥锁、条件变量等进行更细粒度控制,但 Channel 是最常用且推荐的方法,因为它鼓励通过消息传递而非共享数据来实现协作。

数据竞争

在 Go 中,goroutines 是并发执行的基本单位,它们可以在同一时间访问共享数据。这种并发性虽然提高了程序的效率,但也带来了数据竞争和不安全访问的问题。

尽管有锁等可以解决数据安全问题,但我们仍然尽可能的避免可变共享状态,通过将需要修改的数据封装到单个 goroutine 中,并通过 channel 与其他部分交互,可以有效降低复杂性和错误风险。

数据竞争(Data Race)

  • 定义:当两个或多个 goroutines 同时访问相同的内存位置,并且至少有一个是写操作时,就会发生数据竞争。
  • 影响:数据竞争可能导致不可预测的行为、程序崩溃或错误结果。

示例代码:

func main() {
	for i := 0; i < 100; i++ {
		go modify()
		go modify2()
	}
	fmt.Println(a) // 每次执行输出结果都可能不为1
}

var a = 1

func modify() {
	a += 1
}

func modify2() {
	a -= 1
}

由于 Go 语言运行时调度 goroutine 的执行顺序是不确定的,最终 a 的值可能是增加、减少,甚至可能保持不变。这取决于 goroutine 执行的顺序和时机。

同时可以使用 Go 的 race detector 来检测代码中的潜在数据竞争问题。在编译或运行时添加 -race 标志即可启用此功能。

go run -race test_main.go

输出:

==================
WARNING: DATA RACE
Read at 0x0000111c15f0 by goroutine 6:
  main.modify()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:18 +0x24

Previous write at 0x0000111c15f0 by goroutine 8:
  main.modify()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:18 +0x3c

Goroutine 6 (running) created at:
  main.main()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:9 +0x32

Goroutine 8 (finished) created at:
  main.main()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:9 +0x32
==================
...
...
...
==================
WARNING: DATA RACE
Read at 0x0000111c15f0 by main goroutine:
  main.main()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:12 +0x5e

Previous write at 0x0000111c15f0 by goroutine 22:
  main.modify()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:18 +0x3c

Goroutine 22 (finished) created at:
  main.main()
      /Users/fangyirui/GolandProjects/awesomeProject/_29goroutine/test_main.go:9 +0x32
==================
2
Found 7 data race(s)
exit status 66

互斥锁 (sync.Mutex)

  • 基本概念:互斥锁(Mutex)是一种最基本的同步原语,用于保护临界区,以确保同一时间只有一个 goroutine 可以访问被保护的数据。

  • 使用方法

    • Lock(): 获取互斥锁。如果已经被其他 goroutine 锁定,则阻塞直到获取到该锁。
    • Unlock(): 释放互斥锁。
    var (
    	count = 1
    	mu    = sync.Mutex{}
    	wg    = sync.WaitGroup{}
    )
    
    func add() {
    	mu.Lock()
    	count += 1
    	mu.Unlock()
    	wg.Done()
    }
    
    func minus() {
    	mu.Lock()
    	count -= 1
    	mu.Unlock()
    	wg.Done()
    }
    
    func Test1(t *testing.T) {
    
    	for i := 0; i < 1000; i++ {
    		wg.Add(1)
    		go add()
    		wg.Add(1)
    		go minus()
    	}
    
    	wg.Wait()
    
    	fmt.Println(count)
    }
    

读写锁 (sync.RWMutex)

  • 基本概念:读写互斥锁允许多个读取操作同时进行,但写操作是独占的。这意味着在持有读锁时,可以允许其他 goroutine 获取读锁,但不能获取写锁;而持有写锁时,任何其他 goroutine 都不能获取读或写。

  • 使用方法

    • RLock() / RUnlock():用于读取操作,允许多个同时存在。
    • Lock() / Unlock():用于写入操作,是独占性的,与sync.Mutex的没啥区别。
    var (
    	data    = 0
    	rwMutex = sync.RWMutex{}
    )
    
    // 读操作
    func readData() {
    	rwMutex.RLock() // 加读锁
    	fmt.Println("读取数据:", data)
    	time.Sleep(1 * time.Second) // 模拟读操作耗时
    	rwMutex.RUnlock()           // 释放读锁
    	wg.Done()
    }
    
    // 写操作
    func writeData(value int) {
    	rwMutex.Lock() // 加写锁
    	data = value
    	fmt.Println("写入数据:", data)
    	rwMutex.Unlock() // 释放写锁
    	wg.Done()
    }
    
    func Test2(t *testing.T) {
    	// 启动多个读操作
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go readData()
    	}
    
    	// 启动一个写操作
    	wg.Add(1)
    	go writeData(100)
    
    	wg.Wait()
    }
    

注:如果不加读锁,结果会比较不可控,读锁还是可以保证写入的原子性,即保证同时读的结果都一致

原子操作 (sync/atomic)

  • 提供了一组底层函数,用于对整数值进行原子加载、存储和修改。这些函数可以避免使用锁,从而提高性能。
var count2 int32 = 0

func addAtomic() {
	atomic.AddInt32(&count2, 1)
	wg.Done()
}

func minusAtomic() {
	atomic.AddInt32(&count2, -1)
	wg.Done()
}

func Test3(t *testing.T) {
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go addAtomic()
		wg.Add(1)
		go minusAtomic()
	}

	wg.Wait()

	fmt.Println(count2)
}