通常程序以独立方式编写,简单易维护,但一些时候处于性能考量就需要并发,go语言的并发是基于csp模型的,通信顺序进程(Communicating Sequential Processes,CSP),不像其他语言对数据加锁,而是通过使用通道channel在gorountine中传递信息
并发与并行
并发的概念是离不开程序进程与线程的概念的,通常进程指一个运行的程序,其中包含了线程,启动时的线程是主线程,操作系统会调度线程,1.5前每个可用的物理处理器分配一个逻辑处理器,1.5后整个应用程序则只分配一个逻辑处理器,来调度所有的goroutine
如图,调度器将goroutine排列到队列里,绑定线程,将goroutine依次执行,如果系统调用阻塞的话,就新开线程等待返回,同时继续处理下一个routine,而针对网络io的话,阻塞时routine会与逻辑处理器分离,放到网络轮询器的运行时上,当网络轮询器指示操作就绪时,再回到逻辑处理器执行
逻辑处理器数量没有上限,但线程数默认10000,超过就会崩溃,可以通过调用runtime/debug包的SetMaxThreads方法来更改
并发不同于并行,它不是同时执行多个,而是调配管理多个,在合适的时机处理合适的任务,在有限的硬件资源下,具有更好的表现
如果要实现并行,就要有多个逻辑处理器,调度器会平均分配routine到多个线程,并且要有多个物理处理器,这样才能达到并行的效果
如图是并发与并行的区别
goroutine
// 这个示例程序展示如何创建goroutine
// 以及调度器的行为
package main
import (
"fmt"
"runtime"
"sync"
)
// main是所有Go程序的入口
func main() {
// 分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)
// wg用来等待程序完成
// 计数加2,表示要等待两个goroutine
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("Start Goroutines")
// 声明一个匿名函数,并创建一个goroutine
go func() {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
// 显示字母表3次
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// 声明一个匿名函数,并创建一个goroutine
go func() {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
// 显示字母表3次
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// 等待goroutine结束
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("\nTerminating Program")
}
这里两个匿名函数分别输出26个小写和大写字母,使用wait等待返回后,逻辑调度器停止,运行结果是
这里由于一个goroutine执行的太快,还没切换到第二个goroutine就已经全部输出完了
其中waitgroup是指两个运行的goroutine,done是通知执行完毕,这样做是防止一个routine执行太久,可以手动停止
如下是一个长时间运行的go示例,可以观察这一现象
// 这个示例程序展示goroutine调度器是如何在单个线程上
// 切分时间片的
package main
import (
"fmt"
"runtime"
"sync"
)
// wg用来等待程序完成
var wg sync.WaitGroup
// main是所有Go程序的入口
func main() {
// 分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)
// 计数加2,表示要等待两个goroutine
wg.Add(2)
// 创建两个goroutine
fmt.Println("Create Goroutines")
go printPrime("A")
go printPrime("B")
// 等待goroutine结束
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("Terminating Program")
}
// printPrime 显示5000以内的素数值
func printPrime(prefix string) {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
next:
for outer := 2; outer < 5000; outer++ {
for inner := 2; inner < outer; inner++ {
if outer%inner == 0 {
continue next
}
}
fmt.Printf("%s:%d\n", prefix, outer)
}
fmt.Println("Completed", prefix)
}
这里goroutine A先查找素数,耗时较长,这里调度器会调度到B,B输出4591时,又被调度器切换到A,然后A输出到4567,又切换到B,这次B输出完之后,线程接收到返回就切换回A并输出完剩下的数
这里import “runtime”
// 给每个可用的核心分配一个逻辑处理器,runtime.GOMAXPROCS(runtime.NumCPU()) ,runtime.NumCPU返回物理cpu的数量
package main
import (
"fmt"
"runtime"
"sync"
)
// main是所有Go程序的入口
func main() {
// 分配2个逻辑处理器给调度器使用
runtime.GOMAXPROCS(2)
// wg用来等待程序完成
// 计数加2,表示要等待两个goroutine
var wg sync.WaitGroup
wg.Add(2)
fmt.Println("Start Goroutines")
// 声明一个匿名函数,并创建一个goroutine
go func() {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
// 显示字母表3次
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// 声明一个匿名函数,并创建一个goroutine
go func() {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
// 显示字母表3次
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// 等待goroutine结束
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("\nTerminating Program")
}
只有在有多个逻辑处理器且可以同时让每个goroutine运行在一个可用的物理处理器上的时候,goroutine才会并行运行。
竞争状态
并行中,两个多个goroutine访问一个资源,并且没有互相同步,这就是竞争状态,通常这种读写必须是原子化的,即只有一个goroutine同时操作
// 这个示例程序展示如何在程序里造成竞争状态
// 实际上不希望出现这种情况
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// counter是所有goroutine都要增加其值的变量
counter int
// wg用来等待程序结束
wg sync.WaitGroup
)
// main是所有Go程序的入口
func main() {
// 计数加2,表示要等待两个goroutine
wg.Add(2)
// 创建两个goroutine
go incCounter(1)
go incCounter(2)
// 等待goroutine结束
wg.Wait()
fmt.Println("Final Counter:", counter)
}
// incCounter增加包里counter变量的值
func incCounter(id int) {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
for count := 0; count < 2; count++ {
// 捕获counter的值
value := counter
// 当前goroutine从线程退出,并放回到队列
runtime.Gosched()
// 增加本地value变量的值
value++
// 将该值保存回counter
counter = value
}
}
如图,这个竞争状态下,两个goroutine值没有叠加,而是各自计算,互相覆盖,每个线程都会将值存到一个副本value中,然后再写入,goshed是将goroutine退出线程,让其他goroutine可以运行,便于观察
go语言有race工具可以用来观察竞争
go build -race // 用竞争检测器标志来编译程序
./example
// 运行程序
==================
WARNING: DATA RACE
Write by goroutine 5:
main.incCounter()
/example/main.go:49 +0x96
Previous read by goroutine 6:
main.incCounter()
/example/main.go:40 +0x66
Goroutine 5 (running) created at:
main.main()
/example/main.go:25 +0x5c
Goroutine 6 (running) created at:
main.main()
/example/main.go:26 +0x73
==================
Final Counter: 2
Found 1 data race(s)
上一部分指出了有竞争状态的代码,分别是读写counter操作,可以通过锁来解决竞争问题
锁与原子函数
原子函数通过底层原理来规定访问资源的顺序
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
// counter是所有goroutine都要增加其值的变量
counter int64
// wg用来等待程序结束
wg sync.WaitGroup
)
// main是所有Go程序的入口
func main() {
// 计数加2,表示要等待两个goroutine
wg.Add(2)
// 创建两个goroutine
go incCounter(1)
go incCounter(2)
// 等待goroutine结束
wg.Wait()
// 显示最终的值
fmt.Println("Final Counter:", counter)
}
// incCounter增加包里counter变量的值
func incCounter(id int) {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
for count := 0; count < 2; count++ {
// 安全地对counter加1
atomic.AddInt64(&counter, 1)
// 当前goroutine从线程退出,并放回到队列
runtime.Gosched()
}
}
这里使用atomic包下的addint64函数,保证同一时刻只有一个goroutine操作,并且它引用原子函数时,goroutine自动做同步,此外还有loadInt64和StoreInt64,分别时安全读写整型值, 并且创建一个同步标志并向不同goroutine通知
// 这个示例程序展示如何使用atomic包里的
// Store和Load类函数来提供对数值类型
// 的安全访问
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
// shutdown是通知正在执行的goroutine停止工作的标志
shutdown int64
// wg用来等待程序结束
wg sync.WaitGroup
)
// main是所有Go程序的入口
func main() {
// 计数加2,表示要等待两个goroutine
wg.Add(2)
// 创建两个goroutine
go doWork("A")
go doWork("B")
// 给定goroutine执行的时间
time.Sleep(1 * time.Second)
// 该停止工作了,安全地设置shutdown标志
fmt.Println("Shutdown Now")
atomic.StoreInt64(&shutdown, 1)
// 等待goroutine结束
wg.Wait()
}
// doWork用来模拟执行工作的goroutine,
// 检测之前的shutdown标志来决定是否提前终止
func doWork(name string) {
defer wg.Done()
for {
fmt.Printf("Doing %s Work\n", name)
time.Sleep(250 * time.Millisecond)
// 要停止工作了吗?
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s Down\n", name)
break
}
}
}
// 在函数退出时调用Done来通知main函数工作已经完成
这里如果有goroutine试图在storeint时loadint,则原子函数会将其同步,保证其不进入竞争状态
互斥锁
// 这个示例程序展示如何使用互斥锁来
// 定义一段需要同步访问的代码临界区
// 资源的同步访问
package main
import (
"fmt"
"runtime"
"sync"
)
var (
// counter是所有goroutine都要增加其值的变量
counter int
// wg用来等待程序结束
wg sync.WaitGroup
// mutex 用来定义一段代码临界区
mutex sync.Mutex
)
// main是所有Go程序的入口
func main() {
// 计数加2,表示要等待两个goroutine
wg.Add(2)
// 创建两个goroutine
go incCounter(1)
go incCounter(2)
// 等待goroutine结束
wg.Wait()
fmt.Printf("Final Counter: %d\\n", counter)
}
// incCounter使用互斥锁来同步并保证安全访问,
// 增加包里counter变量的值
func incCounter(id int) {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
for count := 0; count < 2; count++ {
// 同一时刻只允许一个goroutine进入
// 这个临界区
mutex.Lock()
{
// 捕获counter的值
value := counter
// 当前goroutine从线程退出,并放回到队列
runtime.Gosched()
// 增加本地value变量的值
value++
// 将该值保存回counter
counter = value
}
mutex.Unlock()
// 释放锁,允许其他正在等待的goroutine
// 进入临界区
}
}
互斥锁就是定义一段临时区域,在解锁前只能有一个goroutine运行
通道
原子函数和互斥锁都是传统的解决办法,通道机制则是golang独有的解决办法,可以在goroutine中共享和交换数据
// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)
分别创建了无缓冲和有缓冲的通道,chan关键字是必须的,之后是交换的数据类型,数字是缓冲区的大小
// 有缓冲的字符串通道
buffered := make(chan string, 10)
// 通过通道发送一个字符串
buffered <- "Gopher"
如上是使用< -关键字向缓冲区发送字符串
// 从通道接收一个字符串
value := <-buffered
如上是接收字符串,注意箭头方向
有无缓冲通道
无缓冲的通道是指没有保存能力的通道,发送和接收方需要同时准备好,如图
在通道中,2345步,两个goroutine都是被锁住的
// 这个示例程序展示如何用无缓冲的通道来模拟
// 2个goroutine间的网球比赛
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// wg用来等待程序结束
var wg sync.WaitGroup
func init() {
rand.Seed(time.Now().UnixNano())
}
// main是所有Go程序的入口
func main() {
// 创建一个无缓冲的通道
court := make(chan int)
// 计数加2,表示要等待两个goroutine
wg.Add(2)
// 启动两个选手
go player("Nadal", court)
go player("Djokovic", court)
// 发球
court <- 1
// 等待游戏结束
wg.Wait()
}
// player 模拟一个选手在打网球
func player(name string, court chan int) {
// 在函数退出时调用Done来通知main函数工作已经完成
defer wg.Done()
for {
// 等待球被击打过来
ball, ok := <-court
if !ok {
// 如果通道被关闭,我们就赢了
fmt.Printf("Player %s Won\n", name)
return
}
// 选随机数,然后用这个数来判断我们是否丢球
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
// 关闭通道,表示我们输了
close(court)
return
}
// 显示击球数,并将击球数加1
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
// 将球打向对手
court <- ball
}
}
如图,是一个击球的程序,模拟无缓冲的通道,在击球前等待通道传递数据,每次随机一个数字代表是否得分,得分则传递给通道,否则判定失败,无限循环
// 这个示例程序展示如何用无缓冲的通道来模拟
// 4个goroutine间的接力比赛
package main
import (
"fmt"
"sync"
"time"
)
// wg用来等待程序结束
var wg sync.WaitGroup
// main是所有Go程序的入口
func main() {
// 创建一个无缓冲的通道
baton := make(chan int)
// 为最后一位跑步者将计数加1
wg.Add(1)
// 第一位跑步者持有接力棒
go Runner(baton)
// 开始比赛
baton <- 1
// 等待比赛结束
wg.Wait()
}
// Runner模拟接力比赛中的一位跑步者
func Runner(baton chan int) {
var newRunner int
// 等待接力棒
runner := <-baton
// 开始绕着跑道跑步
fmt.Printf("Runner %d Running With Baton\n", runner)
// 创建下一位跑步者
if runner != 4 {
newRunner = runner + 1
fmt.Printf("Runner %d To The Line\n", newRunner)
go Runner(baton)
}
// 围绕跑道跑
time.Sleep(100 * time.Millisecond)
// 比赛结束了吗?
if runner == 4 {
fmt.Printf("Runner %d Finished, Race Over\n", runner)
wg.Done()
return
}
// 将接力棒交给下一位跑步者
fmt.Printf("Runner %d Exchange With Runner %d\n",
runner,
newRunner)
baton <- newRunner
}
如上是一个模拟接力的程序,开始等待通道传递数据,在几秒后给通道传入数据,知道第四个人,结束循环
这些是无缓冲的通道,必须即放即取,而有缓冲的通道如下,可以异步存取
// 这个示例程序展示如何使用
// 有缓冲的通道和固定数目的
// goroutine来处理一堆工作
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const (
numberGoroutines = 4 // 要使用的goroutine的数量
taskLoad = 10 // 要处理的工作的数量
)
// wg用来等待程序完成
var wg sync.WaitGroup
// init初始化包,Go语言运行时会在其他代码执行之前
// 优先执行这个函数
func init() {
// 初始化随机数种子
rand.Seed(time.Now().Unix())
}
// main是所有Go程序的入口
func main() {
// 创建一个有缓冲的通道来管理工作
tasks := make(chan string, taskLoad)
// 启动goroutine来处理工作
wg.Add(numberGoroutines)
for gr := 1; gr <= numberGoroutines; gr++ {
go worker(tasks, gr)
}
// 增加一组要完成的工作
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}
// 当所有工作都处理完时关闭通道
// 以便所有goroutine退出
close(tasks)
// 等待所有工作完成
wg.Wait()
// 通知函数已经返回
defer wg.Done()
}
// worker作为goroutine启动来处理
// 从有缓冲的通道传入的工作
func worker(tasks chan string, worker int) {
for {
// 等待分配工作
task, ok := <-tasks
if !ok {
// 这意味着通道已经空了,并且已被关闭
fmt.Printf("Worker: %d : Shutting Down\n", worker)
return
}
// 显示我们开始工作了
fmt.Printf("Worker: %d : Started %s\n", worker, task)
// 随机等一段时间来模拟工作
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)
// 显示我们完成了工作
fmt.Printf("Worker: %d : Completed %s\n", worker, task)
}
}
运行结果
这里创建了四个goroutine代表四个人,10个任务,然后使用有缓冲的通道接收10个任务,之后通道会关闭,代表可以发送但不能接收,然后四个goroutine取出任务,直到从通道中接收不到任务,程序终止
有缓冲和无缓冲的最大区别就是是否保证goroutine同时处理任务