Go从入门到精通(19)
协程(goroutine)与通道(channel)
文章目录
前言
Go 原生支持应用之间的通信(网络,客户端和服务端,分布式计算)和程序的并发。程序可以在不同的处理器和计算机上同时执行不同的代码段。Go 语言为构建并发程序的基本代码块是 协程 (goroutine) 与通道 (channel)。
不要通过共享内存来通信,而通过通信来共享内存。
并发、并行和协程
什么是协程
一个应用程序是运行在机器上的一个进程;进程是一个运行在自己内存地址空间里的独立执行体。一个进程由一个或多个操作系统线程组成,这些线程其实是共享同一个内存地址空间的一起工作的执行体。几乎所有’正式’的程序都是多线程的,以便让用户或计算机不必等待,或者能够同时服务多个请求(如 Web 服务器),或增加性能和吞吐量(例如,通过对不同的数据集并行执行代码)。一个并发程序可以在一个处理器或者内核上使用多个线程来执行任务,但是只有同一个程序在某个时间点同时运行在多核或者多处理器上才是真正的并行。
并行是一种通过使用多处理器以提高速度的能力。所以并发程序可以是并行的,也可以不是。
公认的,使用多线程的应用难以做到准确,最主要的问题是内存中的数据共享,它们会被多线程以无法预知的方式进行操作,导致一些无法重现或者随机的结果(称作 竞态)。
不要使用全局变量或者共享内存,它们会给你的代码在并发运算的时候带来危险。
在 Go 中,应用程序并发处理的部分被称作 goroutines(协程),它可以进行更有效的并发运算。在协程和操作系统线程之间并无一对一的关系:协程是根据一个或多个线程的可用性,映射(多路复用,执行于)在他们之上的;协程调度器在 Go 运行时很好的完成了这个工作。
协程工作在相同的地址空间中,所以共享内存的方式一定是同步的;这个可以使用 sync 包来实现,不过我们很不鼓励这样做:Go 使用 channels 来同步协程
使用 GOMAXPROCS
GOMAXPROCS 是 Go 语言中一个重要的环境变量和 runtime 包提供的函数,用于控制 Go 程序中可同时执行的最大 CPU 核心数。它直接影响 Go 程序的并发性能和资源利用率。
基本概念
GOMAXPROCS 的值决定了 Go 运行时系统(Goroutine Scheduler)中逻辑处理器(P)的数量。每个逻辑处理器(P)对应一个操作系统线程(M),而 Goroutine(G)会被调度到这些线程上执行。
- 默认值:等于物理 CPU 核心数(runtime.NumCPU())。
- 作用:控制并行度,避免过多线程导致的上下文切换开销。
如何设置 GOMAXPROCS
有两种方式可以设置 GOMAXPROCS:
方式一:通过环境变量
在运行程序前设置环境变量:
# Linux/macOS
export GOMAXPROCS=4
# Windows
set GOMAXPROCS=4
# 运行程序
go run your_program.go
方式二:通过代码动态调整
在程序中使用 runtime.GOMAXPROCS() 函数:
package main
import (
"fmt"
"runtime"
)
func main() {
// 获取当前 GOMAXPROCS 值
old := runtime.GOMAXPROCS(4) // 设置为 4 个核心
fmt.Printf("Old GOMAXPROCS: %d, New: %d\n", old, runtime.GOMAXPROCS(0))
// 动态调整(例如根据负载)
runtime.GOMAXPROCS(runtime.NumCPU() / 2) // 使用一半的 CPU 核心
}
适用场景
- CPU 密集型任务:设置为物理 CPU 核心数(默认即可),充分利用多核性能。
- IO 密集型任务:可设置为大于 CPU 核心数的值(如 2*NumCPU()),因为等待 IO 时线程会被阻塞,需要更多线程处理其他 Goroutine。
性能调优建议
- 避免过度设置:过大的 GOMAXPROCS 会导致频繁的上下文切换,降低性能。
- 基准测试:通过 go test -bench 对比不同 GOMAXPROCS 设置下的性能。
- 监控工具:使用 pprof 分析调度器行为和阻塞情况。
示例:对比不同 GOMAXPROCS 的性能
以下是一个简单的基准测试代码,对比不同 GOMAXPROCS 设置下的 CPU 密集型任务性能:
package main
import (
"math/rand"
"runtime"
"sync"
"testing"
"time"
)
func BenchmarkGOMAXPROCS(b *testing.B) {
// 测试不同 GOMAXPROCS 值
for _, n := range []int{1, 2, 4, 8} {
b.Run(fmt.Sprintf("GOMAXPROCS=%d", n), func(subB *testing.B) {
old := runtime.GOMAXPROCS(n)
defer runtime.GOMAXPROCS(old) // 恢复原值
subB.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 模拟 CPU 密集型计算
sum := 0
for j := 0; j < 1000000; j++ {
sum += rand.Intn(100)
}
_ = sum
}()
}
wg.Wait()
})
}
}
运行测试
go test -bench=. -benchmem
小结
- 默认值适合大多数场景:Go 的调度器已经优化得很好,通常无需手动调整。
- 动态调整:对于混合负载的应用,可以根据实时监控数据动态调整 GOMAXPROCS。
- Go 1.5+ 后:默认采用 GOMAXPROCS=NumCPU(),无需手动设置即可利用多核。
通过合理设置 GOMAXPROCS,你可以充分发挥 Go 语言的并发优势,避免资源浪费。
协程是独立的处理单元,一旦陆续启动一些协程,你无法确定他们是什么时候真正开始执行的。你的代码逻辑必须独立于协程调用的顺序。
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("In main()")
go longWait()
go shortWait()
fmt.Println("About to sleep in main()")
// sleep works with a Duration in nanoseconds (ns) !
time.Sleep(10 * 1e9)
fmt.Println("At the end of main()")
}
func longWait() {
fmt.Println("Beginning longWait()")
time.Sleep(5 * 1e9) // sleep for 5 seconds
fmt.Println("End of longWait()")
}
func shortWait() {
fmt.Println("Beginning shortWait()")
time.Sleep(2 * 1e9) // sleep for 2 seconds
fmt.Println("End of shortWait()")
}
输出
In main()
About to sleep in main()
Beginning longWait()
Beginning shortWait()
End of shortWait()
End of longWait()
At the end of main() // after 10s
为了对比使用一个线程,连续调用的情况,移除 go 关键字,重新运行程序。
输出
In main()
Beginning longWait()
End of longWait()
Beginning shortWait()
End of shortWait()
About to sleep in main()
At the end of main() // after 17 s
协程间的信道
概念
在上面例子中,协程是独立执行的,他们之间没有通信。他们必须通信才会变得更有用:彼此之间发送和接收信息并且协调/同步他们的工作。协程可以使用共享变量来通信,但是很不提倡这样做,因为这种方式给所有的共享内存的多线程都带来了困难。
而 Go 有一种特殊的类型,通道(channel),就像一个可以用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。
通常使用这样的格式来声明通道:var identifier chan datatype
未初始化的通道的值是nil。
所以通道只能传输一种类型的数据,比如 chan int 或者 chan string,所有的类型都可以用于通道,空接口 interface{} 也可以。甚至可以(有时非常有用)创建通道的通道。
通道实际上是类型化消息的队列:使数据得以传输。它是先进先出(FIFO)的结构所以可以保证发送给他们的元素的顺序(有些人知道,通道可以比作 Unix shells 中的双向管道(two-way pipe))。通道也是引用类型,所以我们使用 make() 函数来给它分配内存。这里先声明了一个字符串通道 ch1,然后创建了它(实例化):
var ch1 chan string
ch1 = make(chan string)
//或者
ch1 := make(chan string)
//int
chanOfChans := make(chan int)。
//函数
funcChan := make(chan func())
所以通道是第一类对象:可以存储在变量中,作为函数的参数传递,从函数返回以及通过通道发送它们自身。另外它们是类型化的,允许类型检查,比如尝试使用整数通道发送一个指针。
通信操作符 <-
这个操作符直观的标示了数据的传输:信息按照箭头的方向流动。
流向通道(发送)
- ch <- int1 表示:用通道 ch 发送变量 int1(双目运算符,中缀 = 发送)
从通道流出(接收)
int2 = <- ch 表示:变量 int2 从通道 ch(一元运算的前缀操作符,前缀 = 接收)接收数据(获取新值);假设 int2 已经声明过了,如果没有的话可以写成:int2 := <- ch。
<- ch 可以单独调用获取通道的(下一个)值,当前值会被丢弃,但是可以用来验证,所以以下代码是合法的:
if <- ch != 1000{
...
}
同一个操作符 <- 既用于发送也用于接收,但Go会根据操作对象弄明白该干什么 。虽非强制要求,但为了可读性通道的命名通常以 ch 开头或者包含 chan。通道的发送和接收都是原子操作:它们总是互不干扰的完成的。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go sendData(ch)
go getData(ch)
time.Sleep(1e9)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokyo"
}
func getData(ch chan string) {
var input string
// time.Sleep(2e9)
for {
input = <-ch
fmt.Printf("%s ", input)
}
}
输出
Washington Tripoli London Beijing tokyo
main() 函数中启动了两个协程:sendData() 通过通道 ch 发送了 5 个字符串,getData() 按顺序接收它们并打印出来。
如果 2 个协程需要通信,你必须给他们同一个通道作为参数才行。
通道阻塞
默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送不会结束。可以想象一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:
对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。
对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。
看下面的例子,一个协程在无限循环中给通道发送整数数据。不过因为没有接收者,只输出了一个数字 0。
package main
import "fmt"
func main() {
ch1 := make(chan int)
go pump(ch1) // pump hangs
fmt.Println(<-ch1) // prints only 0
}
func pump(ch chan int) {
for i := 0; ; i++ {
ch <- i
}
}
输出
0
pump() 函数为通道提供数值,也被叫做生产者。
为通道解除阻塞定义了 suck 函数来在无限循环中读取通道,
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}
在 main() 中使用协程开始它:
go pump(ch1)
go suck(ch1)
time.Sleep(1e9)
给程序 1 秒的时间来运行:输出了上万个整数。
同步通道-使用带缓冲的通道
一个无缓冲通道只能包含 1 个元素,有时显得很局限。我们给通道提供了一个缓存,可以在扩展的 make 命令中设置它的容量,如下:
buf := 100
ch1 := make(chan string, buf)
buf 是通道可以同时容纳的元素(这里是 string)个数
在缓冲满载(缓冲被全部使用)之前,给一个带缓冲的通道发送数据是不会阻塞的,而从通道读取数据也不会阻塞,直到缓冲空了。
缓冲容量和类型无关,所以可以(尽管可能导致危险)给一些通道设置不同的容量,只要他们拥有同样的元素类型。内置的 cap 函数可以返回缓冲区的容量。
如果容量大于 0,通道就是异步的了:缓冲满载(发送)或变空(接收)之前通信不会阻塞,元素会按照发送的顺序被接收。如果容量是0或者未设置,通信仅在收发双方准备好的情况下才可以成功。
同步:ch :=make(chan type, value)
- value == 0 -> synchronous, unbuffered (阻塞)
- value > 0 -> asynchronous, buffered(非阻塞)取决于value元素
同步:ch :=make(chan type, value)
若使用通道的缓冲,你的程序会在“请求”激增的时候表现更好:更具弹性,专业术语叫:更具有伸缩性(scalable)。在设计算法时首先考虑使用无缓冲通道,只在不确定的情况下使用缓冲。
协程中用通道输出结果
为了知道计算何时完成,可以通过信道回报。在例子 go sum(bigArray) 中,要这样写:
ch := make(chan int)
go sum(bigArray, ch) // bigArray puts the calculated sum on ch
// .. do something else for a while
sum := <- ch // wait for, and retrieve the sum
也可以使用通道来达到同步的目的,这个很有效的用法在传统计算机中称为信号量(semaphore)。或者换个方式:通过通道发送信号告知处理已经完成(在协程中)。
在其他协程运行时让 main 程序无限阻塞的通常做法是在 main 函数的最后放置一个 select {}。
也可以使用通道让 main 程序等待协程完成,就是所谓的信号量模式,我们会在接下来的部分讨论。
信号量模式
下边的片段阐明:协程通过在通道 ch 中放置一个值来处理结束的信号。main 协程等待 <-ch 直到从中获取到值。
我们期望从这个通道中获取返回的结果,像这样:
func compute(ch chan int){
ch <- someComputation() // when it completes, signal on the channel.
}
func main(){
ch := make(chan int) // allocate a channel.
go compute(ch) // start something in a goroutines
doSomethingElseForAWhile()
result := <- ch
}
这个信号也可以是其他的,不返回结果,比如下面这个协程中的匿名函数(lambda)协程:
ch := make(chan int)
go func(){
// doSomething
ch <- 1 // Send a signal; value does not matter
}()
doSomethingElseForAWhile()
<- ch // Wait for goroutine to finish; discard sent value.
实现并行的 for 循环
for i, v := range data {
go func (i int, v float64) {
doSomething(i, v)
...
} (i, v)
}
给通道使用 for 循环
for 循环的 range 语句可以用在通道 ch 上,便可以从通道中获取值,像这样:
for v := range ch {
fmt.Printf("The value is %v\n", v)
}
它从指定通道中读取数据直到通道关闭,才继续执行下边的代码。很明显,另外一个协程必须写入 ch(不然代码就阻塞在 for 循环了),而且必须在写入完成后才关闭。suck 函数可以这样写,且在协程中调用这个动作,程序变成了这样:
package main
import (
"fmt"
"time"
)
func main() {
suck(pump())
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
go func() {
for v := range ch {
fmt.Println(v)
}
}()
}
生产者消费者模式
假设你有 Produce() 函数来产生 Consume 函数需要的值。它们都可以运行在独立的协程中,生产者在通道中放入给消费者读取的值。整个处理过程可以替换为无限循环:
for {
Consume(Produce())
}
通道的方向
通道类型可以用注解来表示它只发送或者只接收:
var send_only chan<- int // channel can only receive data
var recv_only <-chan int // channel can only send data
只接收的通道(<-chan T)无法关闭,因为关闭通道是发送者用来表示不再给通道发送值了,所以对只接收通道是没有意义的。通道创建的时候都是双向的,但也可以分配有方向的通道变量,就像以下代码:
var c = make(chan int) // bidirectional
go source(c)
go sink(c)
func source(ch chan<- int){
for { ch <- 1 }
}
func sink(ch <-chan int) {
for { <-ch }
}
管道和选择器模式就是来源与此
协程的同步:关闭通道-测试阻塞的通道
通道可以被显式的关闭;尽管它们和文件不同:不必每次都关闭。只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。
ch := make(chan float64)
//通过close(ch)
defer close(ch)
//通过ok模式
if v, ok := <-ch; ok {
process(v)
}
使用 select 切换协程
从不同的并发执行的协程中获取值可以通过关键字select来完成,它和switch控制语句非常相似也被称作通信开关;它的行为像是“你准备好了吗”的轮询机制;select监听进入通道的数据,也可以是用通道发送值的时候。
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
...
default: // no value ready to be received
...
}
default 语句是可选的;fallthrough 行为,和普通的 switch 相似,是不允许的。在任何一个 case 中执行 break 或者 return,select 就结束了。
select 做的就是:选择处理列出的多个通信情况中的一个。
- 如果都阻塞了,会等待直到其中一个可以处理
- 如果多个可以处理,随机选择一个
- 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。
在 select 中使用发送操作并且有 default 可以确保发送不被阻塞!如果没有 default,select 就会一直阻塞。
select 语句实现了一种监听模式,通常用在(无限)循环中;在某种情况下,通过 break 语句使循环退出。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)
time.Sleep(1e9)
}
func pump1(ch chan int) {
for i := 0; ; i++ {
ch <- i * 2
}
}
func pump2(ch chan int) {
for i := 0; ; i++ {
ch <- i + 5
}
}
func suck(ch1, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Printf("Received on channel 1: %d\n", v)
case v := <-ch2:
fmt.Printf("Received on channel 2: %d\n", v)
}
}
}
在程序中有 2 个通道 ch1 和 ch2,三个协程 pump1()、pump2() 和 suck()。这是一个典型的生产者消费者模式。在无限循环中,ch1 和 ch2 通过 pump1() 和 pump2() 填充整数;suck() 也是在无限循环中轮询输入的,通过 select 语句获取 ch1 和 ch2 的整数并输出。选择哪一个 case 取决于哪一个通道收到了信息。程序在 main 执行 1 秒后结束。
通道、超时和计时器(Ticker)
time 包中有一些有趣的功能可以和通道组合使用。
其中就包含了 time.Ticker 结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:
type Ticker struct {
C <-chan Time // the channel on which the ticks are delivered.
// contains filtered or unexported fields
...
}
时间间隔的单位是 ns(纳秒,int64),在工厂函数 time.NewTicker 中以 Duration 类型的参数传入:func NewTicker(dur) *Ticker。
在协程周期性的执行一些事情(打印状态日志,输出,计算等等)的时候非常有用。
调用 Stop() 使计时器停止,在 defer 语句中使用。这些都很好的适应 select 语句:
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
...
case v:= <-ch2:
...
case <-ticker.C:
logState(status) // call some logging function logState
default: // no value ready to be received
...
}
协程和恢复(recover)
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}
上边的代码,如果 do(work) 发生 panic,错误会被记录且协程会退出并释放,而其他协程不受影响。
实现 Futures 模式
所谓Futures就是指:有时候在你使用某一个值之前需要先对其进行计算。这种情况下,你就可以在另一个处理器上进行该值的计算,到使用时,该值就已经计算完毕了。
Futures模式通过闭包和通道可以很容易实现,类似于生成器,不同地方在于Futures需要返回一个值。
假设我们有一个矩阵类型,我们需要计算两个矩阵A和B乘积的逆,首先我们通过函数Inverse(M)分别对其进行求逆运算,再将结果相乘。如下函数InverseProduct()实现了如上过程:
func InverseProduct(a Matrix, b Matrix) {
a_inv := Inverse(a)
b_inv := Inverse(b)
return Product(a_inv, b_inv)
}
在这个例子中,a和b的求逆矩阵需要先被计算。那么为什么在计算b的逆矩阵时,需要等待a的逆计算完成呢?显然不必要,这两个求逆运算其实可以并行执行的。换句话说,调用Product函数只需要等到a_inv和b_inv的计算完成。如下代码实现了并行计算方式:
func InverseProduct(a Matrix, b Matrix) {
a_inv_future := InverseFuture(a) // start as a goroutine
b_inv_future := InverseFuture(b) // start as a goroutine
a_inv := <-a_inv_future
b_inv := <-b_inv_future
return Product(a_inv, b_inv)
}
InverseFuture函数以goroutine的形式起了一个闭包,该闭包会将矩阵求逆结果放入到future通道中:
func InverseFuture(a Matrix) chan Matrix {
future := make(chan Matrix)
go func() {
future <- Inverse(a)
}()
return future
}
当开发一个计算密集型库时,使用Futures模式设计API接口是很有意义的。在你的包使用Futures模式,且能保持友好的API接口。此外,Futures可以通过一个异步的API暴露出来。这样你可以以最小的成本将包中的并行计算移到用户代码中。