协程和管道
1、协程
1.1、进程、线程和协程
进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有 5 种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态。
线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位
并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。
并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。
通俗的讲多线程程序在单核CPU上面运行就是并发,多线程程序在多核CUP上运行就是并行,如果线程数大于CPU核数,则多线程程序在多个CPU上面运行既有并行又有并发。
Golang中的协程:
Golang中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang程序的主线程上可以起多个协程。Golang 中多协程可以实现并行或者并发。
协程:可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang的一大特色就是从语言层面原生支持协程,在函数或者方法前面加go关键字就可创建一个协程。可以说Golang中的协程就是goroutine 。
Golang 中的多协程有点类似其他语言中的多线程。
多协程和多线程:Golang 中每个goroutine (协程) 默认占用内存远比Java 、C的线程少。
OS线程(操作系统线程)一般都有固定的栈内存(通常为 2MB 左右),一个goroutine (协程)占用内存非常小,只有 2KB 左右,多协程 goroutine切换调度开销方面远比线程要少。
1.2、goroutine的使用以及sync.WaitGroup
下面实现创建一个协程,在协程和主线程中分别执行打印语句,每次休眠一秒。
package main
import (
"fmt"
"time"
)
func test() {
for i := 0; i < 3; i++ {
fmt.Println("test...")
time.Sleep(time.Second)
}
}
func main() {
go test()
for i := 0; i < 3; i++ {
fmt.Println("main...")
time.Sleep(time.Second)
}
}
但是有个问题,如果主线程执行的速度比较快呢,我们可以修改一下代码,让主线程跑快一些。
此时我们发现主线程执行完后,协程不会再继续执行了。这是因为主线程执行完后整个程序就退出了。
所以我们需要使用sync.WaitGroup来让主线程等待协程。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func test() {
for i := 0; i < 3; i++ {
fmt.Println("test...")
time.Sleep(time.Second)
}
wg.Done()
}
func main() {
wg.Add(1)
go test()
for i := 0; i < 3; i++ {
fmt.Println("main...")
time.Sleep(100)
}
wg.Wait()
}
可以看到此时主线程执行完后会等待协程执行完,然后才会退出。有点类似于创建进程/线程并进行进程/线程等待回收。
其中:sync.WaitGroup本质上是一个计数器,Add方法表示增加计数器,Done表示让计数器减1,Wait表示等待协程执行完毕。
1.3、启动多个协程
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func test(id int) {
for i := 1; i <= 3; i++ {
fmt.Printf("我是协程[%v]..., i=%d\n", id, i)
}
wg.Done()
}
func main() {
for i := 1; i <= 5; i++ {
wg.Add(1)
go test(i)
}
wg.Wait()
}
1.4、设置Golang并行运行的时候占用的cup数量
可以使用runtime.NumCPU()来获取当前计算机上CPU核心的数量。
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个 OS 线程来同时执行Go代码。默认值是机器上的 CPU 核心数。 例如在一个 8 核心的机器上,调度器会把 Go 代码同时调度到8个OS线程上。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
package main
import (
"fmt"
"runtime"
)
func main() {
num := runtime.NumCPU()
fmt.Println("CPU数量为:", num)
runtime.GOMAXPROCS(num - 1)
}
1.5、goroutine统计素数
现在假设要统计1->50000000中有多少素数,最普遍的做法是使用一个for循环来做,如下:
package main
import (
"fmt"
"time"
)
func main() {
u1 := time.Now().Unix()
// var cnt = 0
for i := 2; i <= 50000000; i++ {
flag := true
for j := 2; j*j <= i; j++ {
if i%j == 0 {
flag = false
break
}
}
if flag {
// cnt++
}
}
// fmt.Println("共有素数:", cnt)
u2 := time.Now().Unix()
fmt.Println("花费时间:", u2-u1)
}
我们发现运行时间高达12S。下面我们使用goroutine试试看:
我们创建5个协程来完成,每个协程处理一千万个数据。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func test(x int) {
for i := (x-1)*10000000 + 1; i <= x*10000000; i++ {
if i == 1 {
continue
}
flag := true
for j := 2; j*j <= i; j++ {
if i%j == 0 {
flag = false
break
}
}
if flag {
}
}
wg.Done()
}
func main() {
u1 := time.Now().Unix()
for i := 1; i <= 5; i++ {
wg.Add(1)
go test(i)
}
wg.Wait()
u2 := time.Now().Unix()
fmt.Println("花费时间:", u2-u1)
}
可以看到这里我们花费时间大大的降低了,那如果我们想实现几个协程判断素数,其中一个协程进行打印呢?就需要使用到下面的管道了。
2、管道
管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
2.1、管道的操作
1、channel类型。
2、创建管道需要使用make函数。
3、channel操作。
管道有发送(send)、接收(receive)和关闭(close)三种操作。其中发送和接收都需要使用<-符号,例子如下:
package main
import "fmt"
func main() {
// 1.创建管道
ch := make(chan int, 3)
// 2.给管道发送数据
ch <- 10
ch <- 20
ch <- 30
// 3.从管道获取数据
a := <-ch
fmt.Println(a)
fmt.Printf("值: %v, 类型: %T, 长度: %d, 容量: %d\n", ch, ch, len(ch), cap(ch))
}
管道有点类似队列的结构特点,所以获取的a值为10。另外打印管道返回的是一个地址,容量为3,由于我们取出了一个数据,所以长度为2。
4、管道是引用数据类型。
package main
import "fmt"
func main() {
ch := make(chan int, 3)
ch <- 10
ch <- 20
ch2 := ch
ch2 <- 30
<-ch
<-ch
fmt.Println(<-ch2)
}
5、管道阻塞
当管道中没有数据,再去取就会阻塞。当管道中数据写满了,再去取也会阻塞。
package main
func main() {
ch := make(chan int, 3)
ch <- 10
ch <- 20
ch <- 30
ch <- 40
}
package main
import "fmt"
func main() {
ch := make(chan int, 3)
ch <- 10
ch <- 20
<-ch
<-ch
num := <-ch
fmt.Println(num)
}
6、for range遍历管道
package main
import "fmt"
func main() {
var ch = make(chan int, 5)
for i := 1; i <= 5; i++ {
ch <- i
}
for v := range ch {
fmt.Println(v)
}
}
注意:for range遍历管道只有一个返回值,并且会报错。
解决办法:调用close函数关闭管道,这样for range遍历结束就会退出,不会报错。
package main
import "fmt"
func main() {
var ch = make(chan int, 5)
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
for v := range ch {
fmt.Println(v)
}
}
另外还可以直接使用for循环遍历,不过需要知道管道中的元素个数。
package main
import "fmt"
func main() {
var ch = make(chan int, 5)
for i := 1; i <= 5; i++ {
ch <- i
}
for i := 0; i < len(ch); i++ {
fmt.Println(<-ch)
}
}
2.2、协程和管道协同
7、需求:使用goroutine和channel协同工作。
- 开启一个协程向管道中写入数据。
- 开启一个协程丛管道中读取数据。
- 主线程必须等协程操作完才能退出。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func fn1(ch chan int) {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Println("协程[1]向管道中写入数据:", i)
time.Sleep(500)
}
close(ch)
wg.Done()
}
func fn2(ch chan int) {
for i := 1; i <= 5; i++ {
x := <-ch
fmt.Println("协程[2]从管道中读取数据:", x)
time.Sleep(500)
}
wg.Done()
}
func main() {
var ch = make(chan int, 5)
wg.Add(2)
go fn1(ch)
go fn2(ch)
wg.Wait()
}
这里先读取是因为协程1已经把数据写入了,只不过还没打印出来就被协程2取走并打印输出了。管道是自带同步和互斥机制的,所以哪怕让协程2休眠时间远短于协程1,协程2也会阻塞住等待。
再来看另外一个例子,使用go关键字配合匿名自执行函数创建协程。
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
var ch = make(chan int, 3)
wg.Add(1)
go func() {
for i := 1; i <= 3; i++ {
num := <-ch
fmt.Println(num)
}
wg.Done()
}()
wg.Add(1)
go func() {
for i := 1; i <= 3; i++ {
time.Sleep(time.Second)
ch <- i
}
wg.Done()
}()
wg.Wait()
}
这里有点类似于C++中通过lambda表达式创建线程执行。
需求:改善上面实现的素数判断,还是创建多个协程来判断素数,但是我们还要创建一个协程来打印素数,这就需要实现协程间通信,所以就需要使用协程+管道。
- 创建一个管道intChain和一个协程,这个协程负责写入需要判断的值,然后判断素数的协程从管道intChain中获取数据进行判断。
- 创建16个协程和一个管道primeChain,这16个协程从上面的管道intChain中获取数据进行判断,如果是素数就写入到新创建的管道primeChain中。
- 创建一个打印素数的协程,该协程从存放素数的管道primeChain中获取数据打印输出。
- 主线程进行等待
但是还要注意,我们打印素数协程是使用for range遍历管道的,所以需要close管道,而我们不能在执行方法中随意close管道,因为可能其他协程还要写入,所以还需要一个exitChain来标识,当判断素数的协程执行完就向exitChain写入true。然后我们另外创建一个协程来读取exitChain,当十六次全部读取完毕就可以关闭primeChain,这样for range就不会出错了。
实现代码如下:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func putNum(intChain chan int) {
for i := 2; i <= 100; i++ {
intChain <- i
}
close(intChain)
wg.Done()
}
func isPrime(intChain chan int, primeChain chan int, exitChain chan bool) {
for v := range intChain {
flag := true
for j := 2; j*j <= v; j++ {
if v%j == 0 {
flag = false
break
}
}
if flag {
primeChain <- v
}
}
exitChain <- true
wg.Done()
}
func printPrime(primeChain chan int) {
for v := range primeChain {
fmt.Printf("%v是素数\n", v)
}
wg.Done()
}
func main() {
intChan := make(chan int, 1000)
primeChan := make(chan int, 1000)
exitChan := make(chan bool, 16)
wg.Add(1)
go putNum(intChan)
for i := 0; i < 16; i++ {
wg.Add(1)
go isPrime(intChan, primeChan, exitChan)
}
wg.Add(1)
go printPrime(primeChan)
wg.Add(1)
go func() {
for i := 0; i < 16; i++ {
<-exitChan
}
close(primeChan)
wg.Done()
}()
wg.Wait()
fmt.Println("执行完毕...")
}
2.3、单向管道
有的时候我们会将管道作为参数在多个任务函数间传递, 很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收。
单向管道的实现如下,在chan左边或右边添加<-。
// 声明为只写
var chan1 chan<- int
chan1 = make(chan int, 3)
// 声明为只读
var chan2 <-chan int
chan2 = make(chan int, 3)
举个例子,创建两个协程实现一个协程向管道中写入数据,另一个协程从管道中读取数据。按之前的写法如下:
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func fn1(ch chan int) {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Println("协程[1]向管道中写入:", i)
time.Sleep(time.Millisecond * 100)
}
close(ch)
wg.Done()
}
func fn2(ch chan int) {
for v := range ch {
fmt.Println("协程[2]从管道中读取:", v)
time.Sleep(time.Millisecond * 100)
}
wg.Done()
}
func main() {
var ch = make(chan int, 5)
wg.Add(1)
go fn1(ch)
wg.Add(1)
go fn2(ch)
wg.Wait()
}
这么写其实也没有什么问题,但是我们可以在函数参数上进一步限制管道,对于fn1来说,该管道只进行写入,对于fn2来说,该管道只进行读取,所以可以修改成下面的代码:
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func fn1(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Println("协程[1]向管道中写入:", i)
time.Sleep(time.Millisecond * 100)
}
close(ch)
wg.Done()
}
func fn2(ch <-chan int) {
for v := range ch {
fmt.Println("协程[2]从管道中读取:", v)
time.Sleep(time.Millisecond * 100)
}
wg.Done()
}
func main() {
var ch = make(chan int, 5)
wg.Add(1)
go fn1(ch)
wg.Add(1)
go fn2(ch)
wg.Wait()
}
2.4、多路复用之select
在某些场景下我们需要同时从多个管道中读取数据,这时候就可以使用多路复用技术。多路复用本质上是一种就绪事件的通知机制。
select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送) 过程。select会一直等待,直到底层事件就绪时, 就会执行case分支对应的语句。 具体格式如下:
当读取完所有数据后就会走default。
例如下面读取两个管道中的数据,可以创建两个协程来读取,也可以使用多路复用。
package main
import "fmt"
func main() {
intChan := make(chan int, 10)
for i := 1; i <= 10; i++ {
intChan <- i
}
strChan := make(chan string, 5)
for i := 1; i <= 5; i++ {
strChan <- fmt.Sprintf("hello-%d", i)
}
for {
select {
case v := <-intChan:
fmt.Println("从intChan中获取数据:", v)
case v := <-strChan:
fmt.Println("从strChan中获取数据:", v)
default:
fmt.Println("数据获取完毕...")
return
}
}
}
注意:
1、走到default表示管道中的数据都获取完毕了,由于外层是for死循环,所以需要return退出。
2、使用select来获取管道中的数据,不需要close管道。
2.5、解决协程中出现的异常问题
package main
import (
"fmt"
"time"
)
func print() {
for i := 0; i < 5; i++ {
fmt.Println("hello...")
}
}
func test() {
var m map[string]string
m["username"] = "张三"
}
func main() {
go print()
go test()
time.Sleep(time.Second)
}
在上面的test中,由于我们只是声明了m,没有使用make函数来创建空间,所以该协程出现异常导致整个程序崩溃,类似于C/C++中线程出现异常导致整个进程崩溃。
所以我们可以使用defer + recover来解决。
package main
import (
"fmt"
"time"
)
func print() {
for i := 0; i < 5; i++ {
fmt.Println("hello...")
}
}
func test() {
defer func() {
err := recover()
if err != nil {
fmt.Println("err:", err)
}
}()
var m map[string]string
m["username"] = "张三"
}
func main() {
go print()
go test()
time.Sleep(time.Second)
}
3、Golang协程同步与互斥
3.1、互斥锁
多协程访问共享资源不加以保护就会出问题,下面用多协程模拟一轮抢票。
package main
import (
"fmt"
"sync"
"time"
)
var ticket = 10000
var wg sync.WaitGroup
func GetTicket(i int) {
for {
if ticket > 0 {
time.Sleep(time.Microsecond * 1000)
fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)
ticket--
} else {
break
}
}
wg.Done()
}
func main() {
for i := 1; i <= 4; i++ {
wg.Add(1)
go GetTicket(i)
}
wg.Wait()
}
多协程共享全局变量,在进行抢票的时候我们发现多个协程竟然抢到同一张票,所以我们需要加锁保护。Golang中的互斥量使用很简单,只需要在全局定义一个sync.Mutex对象,调用其中的Lock和Unlock方法即可。
package main
import (
"fmt"
"sync"
"time"
)
var ticket = 10000
var wg sync.WaitGroup
var mutex sync.Mutex
func GetTicket(i int) {
for {
mutex.Lock()
if ticket > 0 {
time.Sleep(time.Microsecond * 1000)
fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)
ticket--
} else {
mutex.Unlock()
break
}
mutex.Unlock()
}
wg.Done()
}
func main() {
for i := 1; i <= 4; i++ {
wg.Add(1)
go GetTicket(i)
}
wg.Wait()
}
3.2、读写锁
读写锁保证任何时刻只有读者或者只有写者,如果是写者只能有一个写者,如果是读者可以有多个读者。使用如下:
var rwMtx sync.RWMutex // 定义读写锁
rwMtx.Lock() //写者加锁
rwMtx.Unlock() //写者解锁
rwMtx.RLock() // 读者加锁
rwMtx.RUnlock() // 读者解锁
下面创建一个协程协程写入数据,另一批协程读取数据。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
var rwMtx sync.RWMutex
func read() {
rwMtx.RLock()
fmt.Println("协程读取数据...")
rwMtx.RUnlock()
wg.Done()
}
func write() {
rwMtx.Lock()
fmt.Println("协程写入数据...")
rwMtx.Unlock()
wg.Done()
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go read()
}
wg.Wait()
}
3.3、条件变量
条件变量是用来实现协程同步和互斥的。使用如下:
var mutex sync.Mutex
var cond = sync.NewCond(&mutex) // 传入锁初始化条件变量
cond.Wait() // 等待条件变量
cond.Signal() // 唤醒一个协程
cond.Broadcast() // 唤醒所有协程
cond.L.Lock() // 加锁,本质使用的是传入的mutex锁
cond.L.Unlock() // 解锁,本质使用的是传入的mutex锁
加锁可以直接使用条件变量提供的方法加锁,也可以使用我们定义的锁来加锁,但是要保证是同一把锁。
下面使用条件变量实现协程同步和互斥,需求:两个协程交替打印奇数和偶数。
package main
import (
"fmt"
"sync"
"time"
)
var mutex sync.Mutex
var cond = sync.NewCond(&mutex)
var wg sync.WaitGroup
var x = 1
var flag = true
func fn1() {
for {
cond.L.Lock()
for !flag {
cond.Wait()
}
fmt.Println("协程[1]打印数据:", x)
x++
flag = false
time.Sleep(time.Second)
cond.Signal()
cond.L.Unlock()
}
wg.Done()
}
func fn2() {
for {
cond.L.Lock()
for flag {
cond.Wait()
}
fmt.Println("协程[2]打印数据:", x)
x++
flag = true
time.Sleep(time.Second)
cond.Signal()
cond.L.Unlock()
}
wg.Done()
}
func main() {
wg.Add(1)
go fn1()
wg.Add(1)
go fn2()
wg.Wait()
}
由于管道自带同步互斥保护机制,所以也可以使用协程+管道来实现。
package main
import (
"fmt"
"sync"
"time"
)
var x = 1
var wg sync.WaitGroup
func fn1(ch1 <-chan bool, ch2 chan<- bool) {
for {
<-ch1
fmt.Println("协程[1]打印数据:", x)
time.Sleep(time.Second)
x++
ch2 <- true
}
wg.Done()
}
func fn2(ch1 chan<- bool, ch2 <-chan bool) {
for {
<-ch2
fmt.Println("协程[2]打印数据:", x)
time.Sleep(time.Second)
x++
ch1 <- true
}
wg.Done()
}
func main() {
var ch1 = make(chan bool, 1)
var ch2 = make(chan bool, 1)
ch1 <- true
wg.Add(1)
go fn1(ch1, ch2)
wg.Add(2)
go fn2(ch1, ch2)
wg.Wait()
}