Go语言实战-第六章 并发

发布于:2024-09-18 ⋅ 阅读:(240) ⋅ 点赞:(0)

通常程序以独立方式编写,简单易维护,但一些时候处于性能考量就需要并发,go语言的并发是基于csp模型的,通信顺序进程(Communicating Sequential Processes,CSP),不像其他语言对数据加锁,而是通过使用通道channel在gorountine中传递信息

并发与并行

并发的概念是离不开程序进程与线程的概念的,通常进程指一个运行的程序,其中包含了线程,启动时的线程是主线程,操作系统会调度线程,1.5前每个可用的物理处理器分配一个逻辑处理器,1.5后整个应用程序则只分配一个逻辑处理器,来调度所有的goroutine
在这里插入图片描述
如图,调度器将goroutine排列到队列里,绑定线程,将goroutine依次执行,如果系统调用阻塞的话,就新开线程等待返回,同时继续处理下一个routine,而针对网络io的话,阻塞时routine会与逻辑处理器分离,放到网络轮询器的运行时上,当网络轮询器指示操作就绪时,再回到逻辑处理器执行
逻辑处理器数量没有上限,但线程数默认10000,超过就会崩溃,可以通过调用runtime/debug包的SetMaxThreads方法来更改
并发不同于并行,它不是同时执行多个,而是调配管理多个,在合适的时机处理合适的任务,在有限的硬件资源下,具有更好的表现
如果要实现并行,就要有多个逻辑处理器,调度器会平均分配routine到多个线程,并且要有多个物理处理器,这样才能达到并行的效果
在这里插入图片描述
如图是并发与并行的区别

goroutine
// 这个示例程序展示如何创建goroutine
// 以及调度器的行为
package main

import (
	"fmt"
	"runtime"
	"sync"
)

// main是所有Go程序的入口
func main() {
	// 分配一个逻辑处理器给调度器使用
	runtime.GOMAXPROCS(1)

	// wg用来等待程序完成
	// 计数加2,表示要等待两个goroutine
	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Println("Start Goroutines")

	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'a'; char < 'a'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	// 等待goroutine结束
	fmt.Println("Waiting To Finish")
	wg.Wait()

	fmt.Println("\nTerminating Program")
}

这里两个匿名函数分别输出26个小写和大写字母,使用wait等待返回后,逻辑调度器停止,运行结果是
在这里插入图片描述
这里由于一个goroutine执行的太快,还没切换到第二个goroutine就已经全部输出完了
其中waitgroup是指两个运行的goroutine,done是通知执行完毕,这样做是防止一个routine执行太久,可以手动停止
如下是一个长时间运行的go示例,可以观察这一现象

// 这个示例程序展示goroutine调度器是如何在单个线程上
// 切分时间片的
package main

import (
	"fmt"
	"runtime"
	"sync"
)

// wg用来等待程序完成
var wg sync.WaitGroup

// main是所有Go程序的入口
func main() {

	// 分配一个逻辑处理器给调度器使用

	runtime.GOMAXPROCS(1)

	// 计数加2,表示要等待两个goroutine
	wg.Add(2)
	// 创建两个goroutine
	fmt.Println("Create Goroutines")
	go printPrime("A")
	go printPrime("B")
	// 等待goroutine结束
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("Terminating Program")
}

// printPrime 显示5000以内的素数值
func printPrime(prefix string) {
	// 在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()
next:
	for outer := 2; outer < 5000; outer++ {
		for inner := 2; inner < outer; inner++ {
			if outer%inner == 0 {
				continue next
			}
		}
		fmt.Printf("%s:%d\n", prefix, outer)
	}
	fmt.Println("Completed", prefix)
}

在这里插入图片描述
在这里插入图片描述
这里goroutine A先查找素数,耗时较长,这里调度器会调度到B,B输出4591时,又被调度器切换到A,然后A输出到4567,又切换到B,这次B输出完之后,线程接收到返回就切换回A并输出完剩下的数
这里import “runtime”
// 给每个可用的核心分配一个逻辑处理器,runtime.GOMAXPROCS(runtime.NumCPU()) ,runtime.NumCPU返回物理cpu的数量

package main

import (
	"fmt"
	"runtime"
	"sync"
)

// main是所有Go程序的入口
func main() {
	// 分配2个逻辑处理器给调度器使用
	runtime.GOMAXPROCS(2)

	// wg用来等待程序完成
	// 计数加2,表示要等待两个goroutine
	var wg sync.WaitGroup
	wg.Add(2)

	fmt.Println("Start Goroutines")

	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'a'; char < 'a'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()
	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	// 等待goroutine结束
	fmt.Println("Waiting To Finish")
	wg.Wait()

	fmt.Println("\nTerminating Program")
}

在这里插入图片描述
只有在有多个逻辑处理器且可以同时让每个goroutine运行在一个可用的物理处理器上的时候,goroutine才会并行运行。

竞争状态

并行中,两个多个goroutine访问一个资源,并且没有互相同步,这就是竞争状态,通常这种读写必须是原子化的,即只有一个goroutine同时操作

// 这个示例程序展示如何在程序里造成竞争状态
// 实际上不希望出现这种情况
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	// counter是所有goroutine都要增加其值的变量
	counter int

	// wg用来等待程序结束
	wg sync.WaitGroup
)

// main是所有Go程序的入口
func main() {

	// 计数加2,表示要等待两个goroutine
	wg.Add(2)
	// 创建两个goroutine
	go incCounter(1)
	go incCounter(2)
	// 等待goroutine结束
	wg.Wait()
	fmt.Println("Final Counter:", counter)

}

// incCounter增加包里counter变量的值
func incCounter(id int) {
	// 在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()
	for count := 0; count < 2; count++ {
		// 捕获counter的值
		value := counter
		// 当前goroutine从线程退出,并放回到队列
		runtime.Gosched()
		// 增加本地value变量的值
		value++
		// 将该值保存回counter
		counter = value
	}

}

在这里插入图片描述
如图,这个竞争状态下,两个goroutine值没有叠加,而是各自计算,互相覆盖,每个线程都会将值存到一个副本value中,然后再写入,goshed是将goroutine退出线程,让其他goroutine可以运行,便于观察
go语言有race工具可以用来观察竞争

go build -race   // 用竞争检测器标志来编译程序
./example  
// 运行程序 


================== 
WARNING: DATA RACE 
Write by goroutine 5: 
main.incCounter() 
/example/main.go:49 +0x96 
Previous read by goroutine 6: 
main.incCounter() 
/example/main.go:40 +0x66 
Goroutine 5 (running) created at: 
main.main() 
/example/main.go:25 +0x5c 
Goroutine 6 (running) created at: 
main.main() 
/example/main.go:26 +0x73 
================== 
Final Counter: 2 
Found 1 data race(s) 

上一部分指出了有竞争状态的代码,分别是读写counter操作,可以通过锁来解决竞争问题

锁与原子函数

原子函数通过底层原理来规定访问资源的顺序

package main

import (
	"fmt"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	// counter是所有goroutine都要增加其值的变量
	counter int64

	// wg用来等待程序结束
	wg sync.WaitGroup
)

// main是所有Go程序的入口
func main() {
	// 计数加2,表示要等待两个goroutine
	wg.Add(2)
	// 创建两个goroutine
	go incCounter(1)
	go incCounter(2)
	// 等待goroutine结束
	wg.Wait()
	// 显示最终的值
	fmt.Println("Final Counter:", counter)
}

// incCounter增加包里counter变量的值
func incCounter(id int) {
	// 在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 安全地对counter加1
		atomic.AddInt64(&counter, 1)

		// 当前goroutine从线程退出,并放回到队列
		runtime.Gosched()
	}
}

在这里插入图片描述
这里使用atomic包下的addint64函数,保证同一时刻只有一个goroutine操作,并且它引用原子函数时,goroutine自动做同步,此外还有loadInt64和StoreInt64,分别时安全读写整型值, 并且创建一个同步标志并向不同goroutine通知

// 这个示例程序展示如何使用atomic包里的
// Store和Load类函数来提供对数值类型
// 的安全访问
package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	// shutdown是通知正在执行的goroutine停止工作的标志
	shutdown int64

	// wg用来等待程序结束
	wg sync.WaitGroup
)

// main是所有Go程序的入口
func main() {
	// 计数加2,表示要等待两个goroutine
	wg.Add(2)

	// 创建两个goroutine
	go doWork("A")
	go doWork("B")

	// 给定goroutine执行的时间
	time.Sleep(1 * time.Second)

	// 该停止工作了,安全地设置shutdown标志
	fmt.Println("Shutdown Now")
	atomic.StoreInt64(&shutdown, 1)

	// 等待goroutine结束
	wg.Wait()

}

// doWork用来模拟执行工作的goroutine,
// 检测之前的shutdown标志来决定是否提前终止
func doWork(name string) {
	defer wg.Done()
	for {
		fmt.Printf("Doing %s Work\n", name)
		time.Sleep(250 * time.Millisecond)
		// 要停止工作了吗?
		if atomic.LoadInt64(&shutdown) == 1 {
			fmt.Printf("Shutting %s Down\n", name)
			break
		}
	}
}

// 在函数退出时调用Done来通知main函数工作已经完成

这里如果有goroutine试图在storeint时loadint,则原子函数会将其同步,保证其不进入竞争状态

互斥锁
// 这个示例程序展示如何使用互斥锁来
// 定义一段需要同步访问的代码临界区
// 资源的同步访问
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	// counter是所有goroutine都要增加其值的变量
	counter int

	// wg用来等待程序结束
	wg sync.WaitGroup

	// mutex 用来定义一段代码临界区
	mutex sync.Mutex
)

// main是所有Go程序的入口
func main() {
	// 计数加2,表示要等待两个goroutine
	wg.Add(2)

	// 创建两个goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待goroutine结束
	wg.Wait()
	fmt.Printf("Final Counter: %d\\n", counter)
}

// incCounter使用互斥锁来同步并保证安全访问,
// 增加包里counter变量的值
func incCounter(id int) {
	// 在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 同一时刻只允许一个goroutine进入
		// 这个临界区
		mutex.Lock()
		{
			// 捕获counter的值
			value := counter

			// 当前goroutine从线程退出,并放回到队列
			runtime.Gosched()

			// 增加本地value变量的值
			value++

			// 将该值保存回counter
			counter = value
		}
		mutex.Unlock()
		// 释放锁,允许其他正在等待的goroutine
		// 进入临界区
	}
}

互斥锁就是定义一段临时区域,在解锁前只能有一个goroutine运行

通道

原子函数和互斥锁都是传统的解决办法,通道机制则是golang独有的解决办法,可以在goroutine中共享和交换数据

// 无缓冲的整型通道 
unbuffered := make(chan int) 
// 有缓冲的字符串通道 
buffered := make(chan string, 10) 

分别创建了无缓冲和有缓冲的通道,chan关键字是必须的,之后是交换的数据类型,数字是缓冲区的大小

// 有缓冲的字符串通道 
buffered := make(chan string, 10) 
// 通过通道发送一个字符串 
buffered <- "Gopher"

如上是使用< -关键字向缓冲区发送字符串

// 从通道接收一个字符串 
value := <-buffered 

如上是接收字符串,注意箭头方向

有无缓冲通道

无缓冲的通道是指没有保存能力的通道,发送和接收方需要同时准备好,如图
在这里插入图片描述
在通道中,2345步,两个goroutine都是被锁住的

// 这个示例程序展示如何用无缓冲的通道来模拟
// 2个goroutine间的网球比赛
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// wg用来等待程序结束
var wg sync.WaitGroup

func init() {
	rand.Seed(time.Now().UnixNano())
}

// main是所有Go程序的入口
func main() {
	// 创建一个无缓冲的通道
	court := make(chan int)

	// 计数加2,表示要等待两个goroutine
	wg.Add(2)

	// 启动两个选手
	go player("Nadal", court)
	go player("Djokovic", court)

	// 发球
	court <- 1

	// 等待游戏结束
	wg.Wait()

}

// player 模拟一个选手在打网球
func player(name string, court chan int) {

	// 在函数退出时调用Done来通知main函数工作已经完成
	defer wg.Done()
	for {
		// 等待球被击打过来
		ball, ok := <-court
		if !ok {
			// 如果通道被关闭,我们就赢了
			fmt.Printf("Player %s Won\n", name)
			return
		}
		// 选随机数,然后用这个数来判断我们是否丢球
		n := rand.Intn(100)
		if n%13 == 0 {
			fmt.Printf("Player %s Missed\n", name)
			// 关闭通道,表示我们输了
			close(court)
			return
		}
		// 显示击球数,并将击球数加1
		fmt.Printf("Player %s Hit %d\n", name, ball)
		ball++
		// 将球打向对手
		court <- ball
	}
}

在这里插入图片描述
如图,是一个击球的程序,模拟无缓冲的通道,在击球前等待通道传递数据,每次随机一个数字代表是否得分,得分则传递给通道,否则判定失败,无限循环

// 这个示例程序展示如何用无缓冲的通道来模拟
// 4个goroutine间的接力比赛
package main

import (
	"fmt"
	"sync"
	"time"
)

// wg用来等待程序结束
var wg sync.WaitGroup

// main是所有Go程序的入口
func main() {
	// 创建一个无缓冲的通道
	baton := make(chan int)

	// 为最后一位跑步者将计数加1
	wg.Add(1)

	// 第一位跑步者持有接力棒
	go Runner(baton)

	// 开始比赛
	baton <- 1

	// 等待比赛结束
	wg.Wait()
}

// Runner模拟接力比赛中的一位跑步者
func Runner(baton chan int) {
	var newRunner int

	// 等待接力棒
	runner := <-baton
	// 开始绕着跑道跑步
	fmt.Printf("Runner %d Running With Baton\n", runner)
	// 创建下一位跑步者
	if runner != 4 {
		newRunner = runner + 1
		fmt.Printf("Runner %d To The Line\n", newRunner)
		go Runner(baton)
	}
	// 围绕跑道跑
	time.Sleep(100 * time.Millisecond)
	// 比赛结束了吗?
	if runner == 4 {
		fmt.Printf("Runner %d Finished, Race Over\n", runner)
		wg.Done()
		return
	}
	// 将接力棒交给下一位跑步者
	fmt.Printf("Runner %d Exchange With Runner %d\n",
		runner,
		newRunner)
	baton <- newRunner

}

如上是一个模拟接力的程序,开始等待通道传递数据,在几秒后给通道传入数据,知道第四个人,结束循环
这些是无缓冲的通道,必须即放即取,而有缓冲的通道如下,可以异步存取
在这里插入图片描述

// 这个示例程序展示如何使用
// 有缓冲的通道和固定数目的
// goroutine来处理一堆工作
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

const (
	numberGoroutines = 4  // 要使用的goroutine的数量
	taskLoad         = 10 // 要处理的工作的数量
)

// wg用来等待程序完成
var wg sync.WaitGroup

// init初始化包,Go语言运行时会在其他代码执行之前
// 优先执行这个函数
func init() {
	// 初始化随机数种子
	rand.Seed(time.Now().Unix())
}

// main是所有Go程序的入口
func main() {
	// 创建一个有缓冲的通道来管理工作
	tasks := make(chan string, taskLoad)

	// 启动goroutine来处理工作
	wg.Add(numberGoroutines)
	for gr := 1; gr <= numberGoroutines; gr++ {
		go worker(tasks, gr)
	}

	// 增加一组要完成的工作
	for post := 1; post <= taskLoad; post++ {
		tasks <- fmt.Sprintf("Task : %d", post)
	}

	// 当所有工作都处理完时关闭通道
	// 以便所有goroutine退出
	close(tasks)

	// 等待所有工作完成
	wg.Wait()
	// 通知函数已经返回
	defer wg.Done()
}

// worker作为goroutine启动来处理
// 从有缓冲的通道传入的工作
func worker(tasks chan string, worker int) {
	for {
		// 等待分配工作
		task, ok := <-tasks
		if !ok {
			// 这意味着通道已经空了,并且已被关闭
			fmt.Printf("Worker: %d : Shutting Down\n", worker)
			return
		}
		// 显示我们开始工作了
		fmt.Printf("Worker: %d : Started %s\n", worker, task)
		// 随机等一段时间来模拟工作
		sleep := rand.Int63n(100)
		time.Sleep(time.Duration(sleep) * time.Millisecond)
		// 显示我们完成了工作
		fmt.Printf("Worker: %d : Completed %s\n", worker, task)
	}
}

运行结果
在这里插入图片描述
这里创建了四个goroutine代表四个人,10个任务,然后使用有缓冲的通道接收10个任务,之后通道会关闭,代表可以发送但不能接收,然后四个goroutine取出任务,直到从通道中接收不到任务,程序终止
有缓冲和无缓冲的最大区别就是是否保证goroutine同时处理任务