Golang——8、协程和管道

发布于:2025-06-06 ⋅ 阅读:(25) ⋅ 点赞:(0)

1、协程

1.1、进程、线程和协程

进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有 5 种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态。

线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位

并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。
并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。

通俗的讲多线程程序在单核CPU上面运行就是并发,多线程程序在多核CUP上运行就是并行,如果线程数大于CPU核数,则多线程程序在多个CPU上面运行既有并行又有并发。

Golang中的协程:
Golang中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang程序的主线程上可以起多个协程。Golang 中多协程可以实现并行或者并发。
协程:可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang的一大特色就是从语言层面原生支持协程,在函数或者方法前面加go关键字就可创建一个协程。可以说Golang中的协程就是goroutine 。

Golang 中的多协程有点类似其他语言中的多线程。
多协程和多线程:Golang 中每个goroutine (协程) 默认占用内存远比Java 、C的线程少。
OS线程(操作系统线程)一般都有固定的栈内存(通常为 2MB 左右),一个goroutine (协程)占用内存非常小,只有 2KB 左右,多协程 goroutine切换调度开销方面远比线程要少。


1.2、goroutine的使用以及sync.WaitGroup

下面实现创建一个协程,在协程和主线程中分别执行打印语句,每次休眠一秒。

package main

import (
	"fmt"
	"time"
)

func test() {
	for i := 0; i < 3; i++ {
		fmt.Println("test...")
		time.Sleep(time.Second)
	}
}

func main() {
	go test()
	for i := 0; i < 3; i++ {
		fmt.Println("main...")
		time.Sleep(time.Second)
	}
}

在这里插入图片描述


但是有个问题,如果主线程执行的速度比较快呢,我们可以修改一下代码,让主线程跑快一些。
在这里插入图片描述
此时我们发现主线程执行完后,协程不会再继续执行了。这是因为主线程执行完后整个程序就退出了。
所以我们需要使用sync.WaitGroup来让主线程等待协程。

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func test() {
	for i := 0; i < 3; i++ {
		fmt.Println("test...")
		time.Sleep(time.Second)
	}
	wg.Done()
}

func main() {
	wg.Add(1)
	go test()
	for i := 0; i < 3; i++ {
		fmt.Println("main...")
		time.Sleep(100)
	}
	wg.Wait()
}

在这里插入图片描述
可以看到此时主线程执行完后会等待协程执行完,然后才会退出。有点类似于创建进程/线程并进行进程/线程等待回收。
其中:sync.WaitGroup本质上是一个计数器,Add方法表示增加计数器,Done表示让计数器减1,Wait表示等待协程执行完毕。


1.3、启动多个协程

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func test(id int) {
	for i := 1; i <= 3; i++ {
		fmt.Printf("我是协程[%v]..., i=%d\n", id, i)
	}
	wg.Done()
}

func main() {
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go test(i)
	}
	wg.Wait()
}

在这里插入图片描述


1.4、设置Golang并行运行的时候占用的cup数量

可以使用runtime.NumCPU()来获取当前计算机上CPU核心的数量。

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个 OS 线程来同时执行Go代码。默认值是机器上的 CPU 核心数。 例如在一个 8 核心的机器上,调度器会把 Go 代码同时调度到8个OS线程上。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

package main

import (
	"fmt"
	"runtime"
)

func main() {
	num := runtime.NumCPU()
	fmt.Println("CPU数量为:", num)
	runtime.GOMAXPROCS(num - 1)
}

1.5、goroutine统计素数

现在假设要统计1->50000000中有多少素数,最普遍的做法是使用一个for循环来做,如下:

package main

import (
	"fmt"
	"time"
)

func main() {
	u1 := time.Now().Unix()
	// var cnt = 0
	for i := 2; i <= 50000000; i++ {
		flag := true
		for j := 2; j*j <= i; j++ {
			if i%j == 0 {
				flag = false
				break
			}
		}
		if flag {
			// cnt++
		}
	}
	// fmt.Println("共有素数:", cnt)
	u2 := time.Now().Unix()
	fmt.Println("花费时间:", u2-u1)
}

在这里插入图片描述
我们发现运行时间高达12S。下面我们使用goroutine试试看:
我们创建5个协程来完成,每个协程处理一千万个数据。

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func test(x int) {
	for i := (x-1)*10000000 + 1; i <= x*10000000; i++ {
		if i == 1 {
			continue
		}
		flag := true
		for j := 2; j*j <= i; j++ {
			if i%j == 0 {
				flag = false
				break
			}
		}
		if flag {

		}
	}
	wg.Done()
}

func main() {
	u1 := time.Now().Unix()
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go test(i)
	}
	wg.Wait()
	u2 := time.Now().Unix()
	fmt.Println("花费时间:", u2-u1)
}

在这里插入图片描述

可以看到这里我们花费时间大大的降低了,那如果我们想实现几个协程判断素数,其中一个协程进行打印呢?就需要使用到下面的管道了。


2、管道

管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

2.1、管道的操作

1、channel类型。
在这里插入图片描述
2、创建管道需要使用make函数。
在这里插入图片描述

3、channel操作。
管道有发送(send)、接收(receive)和关闭(close)三种操作。其中发送和接收都需要使用<-符号,例子如下:

package main

import "fmt"

func main() {
	// 1.创建管道
	ch := make(chan int, 3)
	// 2.给管道发送数据
	ch <- 10
	ch <- 20
	ch <- 30
	// 3.从管道获取数据
	a := <-ch
	fmt.Println(a)
	fmt.Printf("值: %v, 类型: %T, 长度: %d, 容量: %d\n", ch, ch, len(ch), cap(ch))
}

在这里插入图片描述
管道有点类似队列的结构特点,所以获取的a值为10。另外打印管道返回的是一个地址,容量为3,由于我们取出了一个数据,所以长度为2。

4、管道是引用数据类型。

package main

import "fmt"

func main() {
	ch := make(chan int, 3)
	ch <- 10
	ch <- 20
	ch2 := ch
	ch2 <- 30
	<-ch
	<-ch
	fmt.Println(<-ch2)
}

在这里插入图片描述

5、管道阻塞
当管道中没有数据,再去取就会阻塞。当管道中数据写满了,再去取也会阻塞。

package main

func main() {
	ch := make(chan int, 3)
	ch <- 10
	ch <- 20
	ch <- 30
	ch <- 40
}

在这里插入图片描述

package main

import "fmt"

func main() {
	ch := make(chan int, 3)
	ch <- 10
	ch <- 20
	<-ch
	<-ch
	num := <-ch
	fmt.Println(num)
}

在这里插入图片描述


6、for range遍历管道

package main

import "fmt"

func main() {
	var ch = make(chan int, 5)
	for i := 1; i <= 5; i++ {
		ch <- i
	}
	for v := range ch {
		fmt.Println(v)
	}
}

在这里插入图片描述

注意:for range遍历管道只有一个返回值,并且会报错。
解决办法:调用close函数关闭管道,这样for range遍历结束就会退出,不会报错。

package main

import "fmt"

func main() {
	var ch = make(chan int, 5)
	for i := 1; i <= 5; i++ {
		ch <- i
	}
	close(ch)
	for v := range ch {
		fmt.Println(v)
	}
}

在这里插入图片描述

另外还可以直接使用for循环遍历,不过需要知道管道中的元素个数。

package main

import "fmt"

func main() {
	var ch = make(chan int, 5)
	for i := 1; i <= 5; i++ {
		ch <- i
	}
	for i := 0; i < len(ch); i++ {
		fmt.Println(<-ch)
	}
}

在这里插入图片描述


2.2、协程和管道协同

7、需求:使用goroutine和channel协同工作。

  • 开启一个协程向管道中写入数据。
  • 开启一个协程丛管道中读取数据。
  • 主线程必须等协程操作完才能退出。
package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func fn1(ch chan int) {
	for i := 1; i <= 5; i++ {
		ch <- i
		fmt.Println("协程[1]向管道中写入数据:", i)
		time.Sleep(500)
	}
	close(ch)
	wg.Done()
}

func fn2(ch chan int) {
	for i := 1; i <= 5; i++ {
		x := <-ch
		fmt.Println("协程[2]从管道中读取数据:", x)
		time.Sleep(500)
	}
	wg.Done()
}

func main() {
	var ch = make(chan int, 5)
	wg.Add(2)
	go fn1(ch)
	go fn2(ch)
	wg.Wait()
}

在这里插入图片描述
这里先读取是因为协程1已经把数据写入了,只不过还没打印出来就被协程2取走并打印输出了。管道是自带同步和互斥机制的,所以哪怕让协程2休眠时间远短于协程1,协程2也会阻塞住等待。


再来看另外一个例子,使用go关键字配合匿名自执行函数创建协程。

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	var ch = make(chan int, 3)
	wg.Add(1)
	go func() {
		for i := 1; i <= 3; i++ {
			num := <-ch
			fmt.Println(num)
		}
		wg.Done()
	}()
	wg.Add(1)
	go func() {
		for i := 1; i <= 3; i++ {
			time.Sleep(time.Second)
			ch <- i
		}
		wg.Done()
	}()
	wg.Wait()
}

在这里插入图片描述
这里有点类似于C++中通过lambda表达式创建线程执行。


需求:改善上面实现的素数判断,还是创建多个协程来判断素数,但是我们还要创建一个协程来打印素数,这就需要实现协程间通信,所以就需要使用协程+管道。

  • 创建一个管道intChain和一个协程,这个协程负责写入需要判断的值,然后判断素数的协程从管道intChain中获取数据进行判断。
  • 创建16个协程和一个管道primeChain,这16个协程从上面的管道intChain中获取数据进行判断,如果是素数就写入到新创建的管道primeChain中。
  • 创建一个打印素数的协程,该协程从存放素数的管道primeChain中获取数据打印输出。
  • 主线程进行等待

但是还要注意,我们打印素数协程是使用for range遍历管道的,所以需要close管道,而我们不能在执行方法中随意close管道,因为可能其他协程还要写入,所以还需要一个exitChain来标识,当判断素数的协程执行完就向exitChain写入true。然后我们另外创建一个协程来读取exitChain,当十六次全部读取完毕就可以关闭primeChain,这样for range就不会出错了。
实现代码如下:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func putNum(intChain chan int) {
	for i := 2; i <= 100; i++ {
		intChain <- i
	}
	close(intChain)
	wg.Done()
}

func isPrime(intChain chan int, primeChain chan int, exitChain chan bool) {
	for v := range intChain {
		flag := true
		for j := 2; j*j <= v; j++ {
			if v%j == 0 {
				flag = false
				break
			}
		}
		if flag {
			primeChain <- v
		}
	}
	exitChain <- true
	wg.Done()
}

func printPrime(primeChain chan int) {
	for v := range primeChain {
		fmt.Printf("%v是素数\n", v)
	}
	wg.Done()
}

func main() {
	intChan := make(chan int, 1000)
	primeChan := make(chan int, 1000)
	exitChan := make(chan bool, 16)

	wg.Add(1)
	go putNum(intChan)

	for i := 0; i < 16; i++ {
		wg.Add(1)
		go isPrime(intChan, primeChan, exitChan)
	}

	wg.Add(1)
	go printPrime(primeChan)

	wg.Add(1)
	go func() {
		for i := 0; i < 16; i++ {
			<-exitChan
		}
		close(primeChan)
		wg.Done()
	}()

	wg.Wait()
	fmt.Println("执行完毕...")
}

在这里插入图片描述


2.3、单向管道

有的时候我们会将管道作为参数在多个任务函数间传递, 很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收。

单向管道的实现如下,在chan左边或右边添加<-。

// 声明为只写
var chan1 chan<- int
chan1 = make(chan int, 3)

// 声明为只读
var chan2 <-chan int
chan2 = make(chan int, 3)

举个例子,创建两个协程实现一个协程向管道中写入数据,另一个协程从管道中读取数据。按之前的写法如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func fn1(ch chan int) {
	for i := 1; i <= 5; i++ {
		ch <- i
		fmt.Println("协程[1]向管道中写入:", i)
		time.Sleep(time.Millisecond * 100)
	}
	close(ch)
	wg.Done()
}

func fn2(ch chan int) {
	for v := range ch {
		fmt.Println("协程[2]从管道中读取:", v)
		time.Sleep(time.Millisecond * 100)
	}
	wg.Done()
}

func main() {
	var ch = make(chan int, 5)
	wg.Add(1)
	go fn1(ch)
	wg.Add(1)
	go fn2(ch)
	wg.Wait()
}

在这里插入图片描述

这么写其实也没有什么问题,但是我们可以在函数参数上进一步限制管道,对于fn1来说,该管道只进行写入,对于fn2来说,该管道只进行读取,所以可以修改成下面的代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func fn1(ch chan<- int) {
	for i := 1; i <= 5; i++ {
		ch <- i
		fmt.Println("协程[1]向管道中写入:", i)
		time.Sleep(time.Millisecond * 100)
	}
	close(ch)
	wg.Done()
}

func fn2(ch <-chan int) {
	for v := range ch {
		fmt.Println("协程[2]从管道中读取:", v)
		time.Sleep(time.Millisecond * 100)
	}
	wg.Done()
}

func main() {
	var ch = make(chan int, 5)
	wg.Add(1)
	go fn1(ch)
	wg.Add(1)
	go fn2(ch)
	wg.Wait()
}

2.4、多路复用之select

在某些场景下我们需要同时从多个管道中读取数据,这时候就可以使用多路复用技术。多路复用本质上是一种就绪事件的通知机制。

select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送) 过程。select会一直等待,直到底层事件就绪时, 就会执行case分支对应的语句。 具体格式如下:
在这里插入图片描述
当读取完所有数据后就会走default。

例如下面读取两个管道中的数据,可以创建两个协程来读取,也可以使用多路复用。

package main

import "fmt"

func main() {
	intChan := make(chan int, 10)
	for i := 1; i <= 10; i++ {
		intChan <- i
	}
	strChan := make(chan string, 5)
	for i := 1; i <= 5; i++ {
		strChan <- fmt.Sprintf("hello-%d", i)
	}

	for {
		select {
		case v := <-intChan:
			fmt.Println("从intChan中获取数据:", v)
		case v := <-strChan:
			fmt.Println("从strChan中获取数据:", v)
		default:
			fmt.Println("数据获取完毕...")
			return
		}
	}
}

注意:
1、走到default表示管道中的数据都获取完毕了,由于外层是for死循环,所以需要return退出。
2、使用select来获取管道中的数据,不需要close管道。


2.5、解决协程中出现的异常问题

package main

import (
	"fmt"
	"time"
)

func print() {
	for i := 0; i < 5; i++ {
		fmt.Println("hello...")
	}
}

func test() {

	var m map[string]string
	m["username"] = "张三"
}

func main() {
	go print()
	go test()
	time.Sleep(time.Second)
}

在上面的test中,由于我们只是声明了m,没有使用make函数来创建空间,所以该协程出现异常导致整个程序崩溃,类似于C/C++中线程出现异常导致整个进程崩溃。
所以我们可以使用defer + recover来解决。

package main

import (
	"fmt"
	"time"
)

func print() {
	for i := 0; i < 5; i++ {
		fmt.Println("hello...")
	}
}

func test() {
	defer func() {
		err := recover()
		if err != nil {
			fmt.Println("err:", err)
		}
	}()
	var m map[string]string
	m["username"] = "张三"
}

func main() {
	go print()
	go test()
	time.Sleep(time.Second)
}

3、Golang协程同步与互斥

3.1、互斥锁

多协程访问共享资源不加以保护就会出问题,下面用多协程模拟一轮抢票。

package main

import (
	"fmt"
	"sync"
	"time"
)

var ticket = 10000
var wg sync.WaitGroup

func GetTicket(i int) {
	for {
		if ticket > 0 {
			time.Sleep(time.Microsecond * 1000)
			fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)
			ticket--
		} else {
			break
		}
	}
	wg.Done()
}

func main() {
	for i := 1; i <= 4; i++ {
		wg.Add(1)
		go GetTicket(i)
	}
	wg.Wait()
}

在这里插入图片描述

多协程共享全局变量,在进行抢票的时候我们发现多个协程竟然抢到同一张票,所以我们需要加锁保护。Golang中的互斥量使用很简单,只需要在全局定义一个sync.Mutex对象,调用其中的Lock和Unlock方法即可。

package main

import (
	"fmt"
	"sync"
	"time"
)

var ticket = 10000
var wg sync.WaitGroup
var mutex sync.Mutex

func GetTicket(i int) {
	for {
		mutex.Lock()
		if ticket > 0 {
			time.Sleep(time.Microsecond * 1000)
			fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)
			ticket--
		} else {
			mutex.Unlock()
			break
		}
		mutex.Unlock()
	}
	wg.Done()
}

func main() {
	for i := 1; i <= 4; i++ {
		wg.Add(1)
		go GetTicket(i)
	}
	wg.Wait()
}

在这里插入图片描述


3.2、读写锁

读写锁保证任何时刻只有读者或者只有写者,如果是写者只能有一个写者,如果是读者可以有多个读者。使用如下:

var rwMtx sync.RWMutex  // 定义读写锁
rwMtx.Lock()   //写者加锁
rwMtx.Unlock() //写者解锁
rwMtx.RLock() // 读者加锁
rwMtx.RUnlock() // 读者解锁

下面创建一个协程协程写入数据,另一批协程读取数据。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var rwMtx sync.RWMutex

func read() {
	rwMtx.RLock()
	fmt.Println("协程读取数据...")
	rwMtx.RUnlock()
	wg.Done()
}

func write() {
	rwMtx.Lock()
	fmt.Println("协程写入数据...")
	rwMtx.Unlock()
	wg.Done()
}

func main() {
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go read()
	}
	wg.Wait()
}

在这里插入图片描述


3.3、条件变量

条件变量是用来实现协程同步和互斥的。使用如下:

var mutex sync.Mutex
var cond = sync.NewCond(&mutex) // 传入锁初始化条件变量
cond.Wait()      // 等待条件变量
cond.Signal()    // 唤醒一个协程
cond.Broadcast() // 唤醒所有协程
cond.L.Lock()    // 加锁,本质使用的是传入的mutex锁
cond.L.Unlock()  // 解锁,本质使用的是传入的mutex锁

加锁可以直接使用条件变量提供的方法加锁,也可以使用我们定义的锁来加锁,但是要保证是同一把锁。

下面使用条件变量实现协程同步和互斥,需求:两个协程交替打印奇数和偶数。

package main

import (
	"fmt"
	"sync"
	"time"
)

var mutex sync.Mutex
var cond = sync.NewCond(&mutex)
var wg sync.WaitGroup
var x = 1
var flag = true

func fn1() {
	for {
		cond.L.Lock()
		for !flag {
			cond.Wait()
		}
		fmt.Println("协程[1]打印数据:", x)
		x++
		flag = false
		time.Sleep(time.Second)
		cond.Signal()
		cond.L.Unlock()
	}
	wg.Done()
}

func fn2() {
	for {
		cond.L.Lock()
		for flag {
			cond.Wait()
		}
		fmt.Println("协程[2]打印数据:", x)
		x++
		flag = true
		time.Sleep(time.Second)
		cond.Signal()
		cond.L.Unlock()
	}
	wg.Done()
}

func main() {
	wg.Add(1)
	go fn1()
	wg.Add(1)
	go fn2()
	wg.Wait()
}

在这里插入图片描述


由于管道自带同步互斥保护机制,所以也可以使用协程+管道来实现。

package main

import (
	"fmt"
	"sync"
	"time"
)

var x = 1
var wg sync.WaitGroup

func fn1(ch1 <-chan bool, ch2 chan<- bool) {
	for {
		<-ch1
		fmt.Println("协程[1]打印数据:", x)
		time.Sleep(time.Second)
		x++
		ch2 <- true
	}
	wg.Done()
}

func fn2(ch1 chan<- bool, ch2 <-chan bool) {
	for {
		<-ch2
		fmt.Println("协程[2]打印数据:", x)
		time.Sleep(time.Second)
		x++
		ch1 <- true
	}
	wg.Done()
}

func main() {
	var ch1 = make(chan bool, 1)
	var ch2 = make(chan bool, 1)
	ch1 <- true
	wg.Add(1)
	go fn1(ch1, ch2)
	wg.Add(2)
	go fn2(ch1, ch2)
	wg.Wait()
}

在这里插入图片描述