吃透 Golang 基础:Goroutine

发布于:2025-06-20 ⋅ 阅读:(20) ⋅ 点赞:(0)

Goroutine

在这里插入图片描述

在 Golang 中,每一个并发的执行单元叫做一个 Goroutine(所以,承载 main 函数执行逻辑的 goroutine 就是 main goroutine)。

Goroutine 的行为很像 Python 当中的 Thread,但前者本质上是协程,而后者是线程,在面试的过程中通常会被问到 Golang 并发控制中 goroutine 与系统线程有什么区别,简单的回答就是:

  • goroutine 与系统线程相比,创建与销毁的开销较小;
  • goroutine 的调度发生在用户态(因为它是轻量级的线程),协程之间的切换成本极低,而系统线程的调度在内核发生,切换成本较高;
  • goroutine 初始栈的规模较小,可动态增长,而系统线程的内存栈规模固定;
  • goroutine 的通信可通过 channel 完成,而系统线程之间的通信依赖于共享内存和锁机制。

程序启动时,main 函数运行在 main goroutine 上,新的 goroutine 可以通过关键字go来进行创建:

f()
go f()	// create a new goroutine that calls f()

main 函数返回时,所有其下辖的子 goroutine 都将会直接中断,程序退出。

示例 1:并发的 Clock 服务

网络编程是并发大显身手的领域,我曾在面试过程中被问到过一个问题:Goroutine 更适合 CPU 密集型作业还是 I/O 密集型作业,由于当时处于初学阶段,并没有回答上来这个问题。实际上,Goroutine 更适合 I/O 密集型作业,尤其适合处理网络当中的 I/O 作业,服务器是最典型的需要同时处理许多连接的程序,这些连接一般来自彼此互相独立的客户端。

本小节会使用 Go 的网络库net,这个包提供了编写网络客户端或服务器程序的基本组件,基于net可以直接编写一个轻量级的并发 TCP 服务器(zinx项目就是一个很好的例子)。

下例实现了一个顺序执行的时钟服务器,它会每隔一秒将当前时间写到客户端:

package main

import (
	"io"
	"log"
	"net"
	"time"
)

func main() {
	listener, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatal(err)
	}
	
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal(err)
			continue
		}
		handleConn(conn)
	}
}

func handleConn(conn net.Conn) {
	defer conn.Close()
	for {
		_, err := io.WriteString(conn, time.Now().Format("15:04:05\n"))
    // time.Time.Format 提供了一种格式化日期和时间信息的方式. 参数是格式化模版.
		if err != nil {
			return
		}
		time.Sleep(1 * time.Second)
	}
}

上述程序实现了一个简单的 TCP 服务器,它只会像客户端提供一种服务,那就是每隔一秒将当前的时间按照格式以字节流的方式发送给客户端。现在我们再编写一个客户端程序,通过net.Dial接通 socket,来接收字节流:

package main

import (
	"io"
	"log"
	"net"
	"os"
)

func main() {
	conn, err := net.Dial("tcp", "localhost:8080")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	mustCopy(os.Stdout, conn)
}

func mustCopy(dst io.Writer, src io.Reader) {
	if _, err := io.Copy(dst, src); err != nil {
		log.Fatal(err)
	}
}

先启动服务器,再启动客户端,上述代码确实可以正常工作,但是服务器无法同时对两个客户端提供工作。服务器将会阻塞地等待第一个客户端断开连接,才能够为第二个客户端提供服务。但是通过简单地为handleConn这个业务函数加上go关键字,它就会新建一个 goroutine 来处理当前这个 socket 连接:

// ... ... ...
for {
    conn, err := listener.Accept()
    if err != nil {
        log.Fatal(err)
        continue
    }
    go handleConn(conn)
}
// ... ... ...

示例 2:两个 Goroutine 交替收发数据

这个示例不来自于《Go 语言圣经》,而是我根据目前一些公司的面试经验想到的。在面试时,面试官可能不让我们手撕一道算法题,而是让我们在本地 IDE 现场编写一段 Go 程序,实现两个 goroutine 之间互发数据。

在文章的开头已经提到,goroutine 之间基于 channel 进行通信,而不是基于共享内存进行通信,所以我们使用 channel 来实现两个 goroutine 之间的数据收发,在复习 channel 之前先回顾一下 channel 的具体用法。

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1, ch2 := make(chan int), make(chan int)
	cnt := 0 // 一个全局的计数器
	// ch1, ch2 是两个 channel
	go func() {
		for { // 在这个 goroutine 中先接收, 后发送
			val := <-ch1
			// ⬆️ 由于 ch1 不带缓冲区, 阻塞地等待数据到来
			fmt.Println(val)
			ch2 <- cnt
			cnt++
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		for { // 在这个 goroutine 中先发送, 后接收
			ch1 <- cnt
			cnt++
			val := <-ch2
			fmt.Println(val)
			time.Sleep(1 * time.Second)
		}
	}()

	ch := make(chan struct{})
	<-ch
}

基于这样一段简单的程序,就可以回顾许多与 Golang 语法相关的关键问题。在这个程序当中,我们开启了两个 goroutine,第一个 goroutine 首先从 channel 接收数据,然后再发送数据;而第二个 goroutine 首先向 channel 发送数据,然后再接收数据。必须要这样做,否则如果在两个 goroutine 当中都先发送数据,再接收数据,执行这段代码将会直接报错,报错的原因是发生死锁。

发生死锁的原因与 channel 的特性有关,channel 的相关内容我们将在下一小节进行回顾,但是在此不妨先强调一下无缓冲区 channel 的一条特性,那就是:无缓冲区的 channel 发送数据后,会被阻塞,直到 channel 发送的数据被接收。如果同时先发送数据,那么两个 goroutine 都将会被阻塞,直到有变量从 channel 接收数据,但由于两个 goroutine 都被阻塞了,没有实例去接收 channel 的数据,会导致整个程序死锁。

使用带有缓冲区的 channel 可以不用考虑从 channel 接收与发送数据的顺序问题,这一点将在下一节进行回顾。上述程序的运行结果是:

0
1
2
3
4
5
6
7
8
9
10
11
12
13
// ... ... ...

顺带一提的是,在程序的结尾建立了一个类型为struct{}的 channel,它不带有缓冲区,通常被用于在两个 goroutine 之间进行同步。在 main 函数的结尾,使用<- ch阻塞了 main 函数,可以通过在终端键入ctrl + C来终止程序,这样就避免了因 main 函数返回而所以 goroutine 被杀死的情况。

Channel

如果说 goroutine 是 Golang 的并发体,那么 channels 就是 goroutine 之间的通信机制。一个 channel 就可以被视为一个通信管道,它可以让一个 goroutine 给另一个 goroutine 发送值信息

每个 channel 都具有类型,这个类型在创建 channel 的时候指定,channel 只能发送指定类型的值数据。例如,一个 int 类型的 channel 一般写作chan int

通过内建的函数make,我们可以创建一个 channel。

ch := make(chan int)
// 💡 值得注意的一点是, 关键字 make 只能用于创建 slice/channel/map 三种内置类型

和 map 类似,channel 也对应一个 make 创建的底层数据结构的引用(因此,Golang 的引用类型包括 slice/channel/map)。(下面这段话要好好理解:当我们复制一个 channel 或是将 channel 作为实参传入函数时,我们只是拷贝了 channel 的引用(其中包含底层数据结构的指针,这个指针的值也会被拷贝,也就是指针指向的地址不变,即引用的底层数据结构不变),因此调用者和被调用者将引用同一个 channel 对象。和其他引用类型一样,channel 的零值也是 nil。

两个类型相同的 channel 可以使用==进行比较。如果 channel 的引用对象相同,那么==为 true。channel 也可以和 nil 进行比较。

channel 有发送和接收两个主要操作,二者都是通信行为。一个发送语句将一个值从 goroutine 通过 channel 发送到另一个执行接受操作的 goroutine。发送和接收两个操作都使用<-运算符:

ch <- x		// a send statement
x = <- ch	// a receive expression in an assignment statement
// ⬆️ 也就意味着, 接收语句可以没有接收对象, 此时会直接丢弃接收值
<- ch		// a receive statement, result will be discarded

channel 还支持 close 操作,用于关闭 channel,随后,基于该 channel 的发送操作都会导致 panic 异常(可以重点记忆一下,这一点我在最近的一次面试中被考察过)。对于一个已经 close 过的 channel,通过接收操作仍然可以接收到之前已经发送到这个 channel 当中的数据,如果 channel 已经没有数据了,那么接收操作会得到一个零值。

使用内置的 close 函数可以关闭 channel:

close(ch)

以最简单的 make 创建的 channel 是无缓冲区的,但我们可以指定 make 当中的第二个参数,对应的是 channel 的缓冲区容量。如果 channel 的缓冲区容量大于 0,那么这个 channel 就是一个带缓冲区的 channel:

ch = make(chan int)		// unbuffered channel
ch = make(chan int, 0)	// unbuffered channel
ch = make(chan int, 3)	// buffered channel with capacity 3

小结:有关 channel 的状态

根据上面这段描述,如果一个 channel 已经被关闭,那么再次通过这个 channel 进行数据发送,将会直接导致 panic 异常;而针对已经关闭的 channel,继续进行接收操作是没有问题的,如果 channel 当中的值已经全部被接收(channel 可能带有缓冲区),那么接收操作得到的将会是零值。

需要注意的是,可以使用二值来对 channel 进行接收操作,第二个值是一个 bool 类型,标识当前 channel 是否关闭。

带缓冲区的 channel

我们在最开始 goroutine 的 demo 2 当中已经提到,一个 unbuffered channel 在执行发送操作之后,该 goroutine 会被阻塞,直到另一个 goroutine 从这个 channel 当中接收数据(当然,等待草这个 channel 当中接收数据的 goroutine 也会阻塞),两个阻塞的 goroutine 才会继续执行后续的逻辑。

反之也一样成立,如果一个 goroutine 当中使用 unbuffered channel 接收数据,那么这个 goroutine 将会直接阻塞,直到另一个 goroutine 通过这个 channel 发送过来数据,两个 goroutine 才会继续执行后面的语句。

「注意潜在的 deadlock 问题:基于上述分析,如果两个 goroutine 通过一个 unbuffered channel 通信,其中一个必须先发送数据,另一个必须先接收数据,否则程序将会直接崩溃并报错检测到死锁,一个很好的实践就是上述 goroutine 的 demo 2,一个 goroutine 首先从 unbuffered channel 接收数据,另一个 goroutine 首先向 unbuffered channel 发送数据,避免了 deadlock」

基于 unbuffered channel 的发送和接收操作将导致两个 goroutine 进行一次同步操作,基于这个原因,unbuffered channel 有时也会被称为同步 channels。当通过一个 unbuffered channel 发送数据时,接收者收到数据发生在(happens before,这个概念在此处被《Go 语言圣经》重点强调)再次唤醒发送者 goroutine 之前。

在并发编程的概念当中,当我们说 x happens before y 时,不是说 x 事件的发生时间比 y 早,而是说要保证在 y 事件发生之前,x 事件已经全部结束了。

基于 channels 发送消息需要注意两个方面。首先,每个消息都有一个值,但有时候通讯的事实和发生的时刻同样重要。当我们更希望强调通讯发生的时刻时,我们将它称之为消息事件。有些消息事件并不携带额外的信息,它仅仅是用作两个 goroutine 之间的同步,这个时候我们可以使用struct{}空结构体作为 channel 的元素类型。一个基于chan struct{}进行事件同步的例子如下:

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}

串联的 channel(pipeline)

channel 也可以用于将多个 goroutine 连接在一起。一个 channel 的输出将会作为下一个 channel 的输入。这种串联的 channel 就是所谓的管道(pipeline)。下面的例子将三个 goroutine 串联了起来:

package main

import (
	"fmt"
	"time"
)

func main() {
	naturals := make(chan int)
	squares := make(chan int)

	go func() {
		for x := 0; ; x++ {
			naturals <- x
			time.Sleep(1 * time.Second)
		}
	}()

	go func() {
		for {
			x := <-naturals
			squares <- x * x
		}
	}()

	for {
		fmt.Println(<-squares)
	}
}

上述程序将两个子 goroutine 和一个主 goroutine 串联了起来。

形如这类串联的 channels 可以用在长时间运行的服务当中,每个长时间运行的 goroutine 一般都包含一个死循环,不同 goroutine 的死循环当中通过 channel 进行通信。

但是,如果我们希望通过 channel 只发送有限的数列该如何处理呢?如果发送者知道,没有更多的值需要发送给 channel 的话,那么我们也应该让接收者知道,从 channel 当中不会取到多余的有用的值了,此时接收者就可以停止不必要的接收等待。

例如,对于上述程序,如果我们不希望 naturals 这个 channel 继续发送值了,可以直接将它 close 掉:

close(naturals)

方才已经提到,当一个 channel 被关闭,任何 goroutine 当中向它写入的行为都将会触发 panic 异常。但是 goroutine 仍然可以从这个 channel 当中接收数据。当一个被关闭的 channel 中已经发送的数据都已经成功被接收了,那么后续的接收操作将不再阻塞 goroutine,它们将立刻返回一个零值。这就意味着,naturals 这个 channel 被关闭之后,squares 所在的 goroutine 仍然可以从 channel 接收到零值,并且由于接受操作在死循环当中,这个 goroutine 将会无限将零值输入到 squares 当中。

我们可以通过二值接收的方式优化 squares 所在 goroutine 接收 naturals 当中值的逻辑。从 channel 二值接收数据时,第二个值是一个 bool 类型,用于标识当前 channel 是否被关闭。利用这个特性,可以修改上述 goroutine 当中的逻辑:

// Squarer
go func() {
    for {
        x, ok := <-naturals
        if !ok {
            break // channel was closed and drained
        }
        squares <- x * x
    }
    close(squares)
}()

由于这种处理逻辑非常常见,Go 从语法层面对上述逻辑进行了优化,我们可以直接通过 for range 语句来读取 channel 当中的数据。当 channel 打开时,for range 会依次从 channel 接收数据;如果 channel 关闭,那么 for range 将会推出循环:

go func() {
    for x := range naturals {
        squares <- x * x
    }
    close(squares)
}()

至此,我们修改最初的 pipeline,让它只打印前 100 个整型数值的平方:

package main

import (
	"fmt"
	"time"
)

func main() {
	naturals := make(chan int)
	squares := make(chan int)

	go func() {
		defer close(naturals)
		for x := 1; x <= 100; x++ {
			naturals <- x
			time.Sleep(1 * time.Millisecond)
		}
	}()

	go func() {
		defer close(squares)
		for x := range naturals {
			squares <- x * x
		}
	}()

	for val := range squares {
		fmt.Println(val)
	}
}

事实上我们不需要关闭每一个 channel。只有当需要告诉接收者 goroutine,所有的数据已经发送完成了的时候,才需要显式地关闭 channel。不管 channel 是否关闭,当 channel 不再被引用时,Go 的 GC 都会自动将这些内存回收(所有,channel 建在堆上)。

试图重复关闭一个已经关闭的 channel,将会触发 panic 异常,试图关闭一个 nil 的 channel 也会导致 panic 异常。

小结一下操作 channel 会触发 panic 的情况:

  1. 向已经 close 的 channel 发送数据会导致 panic;
  2. 关闭已经关闭的 channel 会导致 panic;
  3. 关闭 nil 的 channel 会导致 panic。

单方向的 channel

随着程序的规模不断变大,将大的函数拆分为小的模块通常会优化程序在工程上的可读性。方才的例子使用了两个 channel 来串联三个 goroutine,这三个 goroutine 可以被进一步放在三个函数当中:

func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)

计算整型数值平方的 squrer 串联起 counter 和 printer,因此它有两个 chan int 作为形参,分别用于输入和输出,这两个 channel 的类型相同,但是使用方式正好相反,一个用于接收数据,而另一个用于发送数据。形参名 in 和 out 明确表示了这个意图,但是仍然无法从编译器与运行时的角度来确保 in 和 out 不被滥用。

这种场景非常典型,当一个 channel 作为形参时,它一般只用于发送或接收数据。为了避免形参的 channel 被滥用,Go 提供了单向的 channel 类型,分别用于只发送和只接收的 channel。chan<- int表示只发送int的 channel,而<-chan int表示只接收int的 channel。

由于close只用于断言不再向 channel 发送新的数据,所有只有在发送者所在的 goroutine 才会调用 close 函数,对一个只接收的 goroutine 调用 close 将会是编译错误。

以下是改进的版本,参数使用单方向的 channel 类型:

func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

调用 counter 时,naturals 的类型将隐式地转为chan<- int。任何双向 channel 向单向 channel 的赋值操作都会导致该隐式转换。注意,不能将一个单向的 channel 转为双向 channel。

显然,我们能够预想到的是,我们在声明一个 channel 时,通常不会把它声明为单向的,这样的话这个 channel 没有任何意义。单向 channel 通常只出现在函数的形参列表当中,传入的双向 channel 会被隐式地转为单向 channel,意思就是告知这个函数,只能使用 channel 发送或接收数据的单向行为。

Buffered Channel

Buffered Channel 内部持有一个元素队列,队列的最大容量在make创建这个 channel 时由第二个参数指定。下面的语句将会创建一个缓冲区大小为 3 的 channel:

ch := make(chan string, 3)

向缓存的 channel 进行发送操作就是向内部的缓存队列尾部插入数据,而接收操作则是从队列的头部删除元素。如果内部缓存队列已满,那么发送操作将阻塞直到另一个 goroutine 执行接收操作释放新的队列空间。相反,如果 channel 的缓存队列是空的,那么接收操作将阻塞直到有另一个 goroutine 执行发送操作而向队列当中插入元素。

所以我们可以在无阻塞的情况下连续向新建的 channel 发送三个值:

ch <- "A"
ch <- "B"
ch <- "C"

此时会充满ch的缓冲区,第四个写入操作将会被阻塞。

此外我们再接收一个值:

fmt.Println(<-ch)

channel 的缓冲队列将会不满也不空,此时对 channel 执行发送或接收操作都不会阻塞。通过这种方式,channel 的缓存队列解耦了发送和接收消息的 goroutine。

可以通过内置的cap得知 channel 内部缓冲区的容量:

fmt.Println(cap(ch))

通过内置的len,我们可以得知当前 channel 内部有多少个元素:

fmt.Println(len(ch))

连续执行两次接收操作将会情况缓冲队列。此时如果再次执行接收操作,这个操作将会阻塞整个 goroutine:

fmt.Println(<-ch)	// "B"
fmt.Println(<-ch)	// "C"
fmt.Println(<-ch)	// 阻塞

需要注意的一点是,我们不应该将一个 buffered channel 当作队列使用。channel 和 goroutine 的调度器机制紧密相连,如果没有其他 goroutine 从 channel 接收数据,发送者将会永远面临阻塞风险。如果你只是需要一个简单的队列,最好使用 slice。

下例展示了一个 buffered channel 的应用。下面这函数并发地向三个镜像站点发送请求,三个站点分布在不同的地理位置,它们将分别收到响应,并发送到 buffered channel 当中。接收者只接收第一个收到的响应:

func mirroredQuery() string {
    responses := make(chan string, 3)
    go func() { responses <- request("asia.gopl.io") }()
    go func() { responses <- request("europe.gopl.io") }()
    go func() { responses <- request("americas.gopl.io") }()
    return <-responses // return the quickest response
}

func request(hostname string) (response string) { /* ... */ }

如果我们将responses定义为 unbuffered channel,那么两个较慢的 goroutine 将会因为没有接收者而永远被卡住,这种情况被称为 goroutine 泄漏,泄漏的 goroutine 与内存不同,它们不会被自动回收,因此需要开发者确保每个 goroutine 正常退出。

基于 select 的多路复用

Go 的 select 机制非常的有趣,它和 switch 语句非常类似,但是 switch 语句是针对某个对象的值作为 case 进行判断而触发事件。select 与 switch 的不同之处在于,它监控的 case 是若干个通信操作(channel 上的发送或接收)。

select {
    case <- ch1:
        // ... ... ...
    case x := <- ch2:
        // ... ... ...
    case ch3 <- y:
        // ... ... ...
    default:
        // ... ... ...
}

select 会等待 case 中有能够执行的 case,当条件满足时,select 会去通信并执行 case 之后的语句。此时,其他通信操作是不会执行的,一个没有任何 case 的 select 语句写作select{},它将会永远等待下去。

这意味着,只要 select 当中有一条 case 之后的通信操作被成功执行,就立刻执行该 case 对应的语句块,其他 case 将会被放弃。

下例设计了一个模拟火箭发射的程序,使用一个计时器 channel 进行倒计时,使用一个 abort channel 模拟发射终止:

func main() {
    // ... create abort channel ...

    fmt.Println("Commencing countdown. Press return to abort")
    select {
        case <- time.After(10 * time.Second):
            // ... ... ...
        case <- abort:
            fmt.Println("Lauch aborted!")
            return
    }
    launch()
}

time.After立刻返回一个 channel,并起一个新的 goroutine 在10 * time.Second向该 channel 发送一个独立的值。

下面的例子更加微妙,使用一个 buffer 为 1 的 channel 打印偶数:

ch := make(chan int, 1)
for i := 0; i < 10; i ++ {
    select {
        case x := <-ch:
            fmt.Println(x)
        case ch <- i:
            continue
    }
}

如果多个 case 同时就绪,select 会随机选择一个执行。

基于 select,我们可以在 goroutine 当中对多个信道进行监听,基于信道上的行为执行相应的逻辑。我们甚至可以将 select 放在死循环当中,来一直监听若干个信道的行为。

select 本身比较常用的场景是进行超时控制会与Context相结合进行 goroutine 上下文的控制。


网站公告

今日签到

点亮在社区的每一天
去签到