Go进阶高并发处理教程
目录
Go并发编程基础
什么是并发?
并发是指程序能够同时处理多个任务的能力。Go语言从设计之初就将并发作为核心特性,提供了简洁而强大的并发编程模型。
Go并发模型的优势
- 轻量级协程:Goroutine比传统线程更轻量
- CSP模型:通过通信来共享内存,而不是通过共享内存来通信
- 内置调度器:Go运行时自动管理goroutine的调度
Goroutine深入理解
创建和启动Goroutine
package main
import (
"fmt"
"time"
)
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
// 启动多个goroutine
for i := 1; i <= 5; i++ {
go worker(i)
}
// 等待所有goroutine完成
time.Sleep(2 * time.Second)
fmt.Println("All workers completed")
}
Goroutine的生命周期
- 创建:使用
go
关键字创建 - 调度:由Go调度器管理
- 执行:在可用的OS线程上执行
- 结束:函数返回时自动结束
调度器工作原理
Go使用M:N调度模型:
- M:OS线程(Machine)
- P:处理器(Processor)
- G:Goroutine
G1 G2 G3 G4
\ | | /
\ | | /
\ | |/
\| |
P1 P2
| |
M1 M2
同步原语详解
sync.WaitGroup
用于等待一组goroutine完成:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 完成时调用Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // 增加等待计数
go worker(i, &wg)
}
wg.Wait() // 等待所有goroutine完成
fmt.Println("All workers completed")
}
sync.Mutex
互斥锁用于保护共享资源:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
// 启动100个goroutine同时增加计数器
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
counter.Increment()
}
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter.Value())
}
sync.RWMutex
读写锁允许多个读操作同时进行:
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func (sm *SafeMap) Get(key string) (int, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
val, ok := sm.data[key]
return val, ok
}
func (sm *SafeMap) Set(key string, value int) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.data[key] = value
}
sync.Once
确保某个操作只执行一次:
package main
import (
"fmt"
"sync"
)
var once sync.Once
var instance *Singleton
type Singleton struct {
data string
}
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating singleton instance")
instance = &Singleton{data: "singleton"}
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
s := GetInstance()
fmt.Printf("Goroutine %d got instance: %s\n", id, s.data)
}(i)
}
wg.Wait()
}
并发模式与最佳实践
Worker Pool模式
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Output string
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
time.Sleep(time.Millisecond * 100) // 模拟工作
result := Result{
Job: job,
Output: fmt.Sprintf("Processed by worker %d", id),
}
results <- result
}
}
func main() {
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 启动worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}
}
close(jobs)
// 等待所有worker完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Job %d result: %s\n", result.Job.ID, result.Output)
}
}
扇入扇出模式
// 扇出:将工作分发给多个goroutine
func fanOut(input <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
output := make(chan int)
outputs[i] = output
go func(out chan<- int) {
defer close(out)
for n := range input {
out <- n * n // 计算平方
}
}(output)
}
return outputs
}
// 扇入:将多个channel的结果合并
func fanIn(inputs ...<-chan int) <-chan int {
output := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(in <-chan int) {
defer wg.Done()
for n := range in {
output <- n
}
}(input)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
性能优化技巧
1. 合理设置GOMAXPROCS
import "runtime"
func init() {
// 设置使用的CPU核心数
runtime.GOMAXPROCS(runtime.NumCPU())
}
2. 避免goroutine泄漏
// 错误示例:可能导致goroutine泄漏
func badExample() {
ch := make(chan int)
go func() {
ch <- 1 // 如果没有接收者,这个goroutine会永远阻塞
}()
// 函数返回,但goroutine仍在运行
}
// 正确示例:使用context控制goroutine生命周期
func goodExample(ctx context.Context) {
ch := make(chan int, 1) // 使用缓冲channel
go func() {
select {
case ch <- 1:
case <-ctx.Done():
return
}
}()
}
3. 使用对象池减少GC压力
import "sync"
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
func processData(data []byte) {
buf := pool.Get().([]byte)
defer pool.Put(buf)
// 使用buf处理数据
}
实战案例
并发HTTP客户端
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type Result struct {
URL string
StatusCode int
Duration time.Duration
Error error
}
func fetchURL(url string, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
start := time.Now()
resp, err := http.Get(url)
duration := time.Since(start)
result := Result{
URL: url,
Duration: duration,
Error: err,
}
if err == nil {
result.StatusCode = resp.StatusCode
resp.Body.Close()
}
results <- result
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.golang.org",
}
results := make(chan Result, len(urls))
var wg sync.WaitGroup
// 并发请求所有URL
for _, url := range urls {
wg.Add(1)
go fetchURL(url, results, &wg)
}
// 等待所有请求完成
go func() {
wg.Wait()
close(results)
}()
// 处理结果
for result := range results {
if result.Error != nil {
fmt.Printf("Error fetching %s: %v\n", result.URL, result.Error)
} else {
fmt.Printf("%s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)
}
}
}
总结
Go语言的并发编程提供了强大而简洁的工具:
- Goroutine:轻量级协程,易于创建和管理
- Channel:类型安全的通信机制
- sync包:提供各种同步原语
- 并发模式:Worker Pool、扇入扇出等经典模式
掌握这些概念和技巧,能够帮助您构建高性能、可扩展的并发应用程序。记住Go的并发哲学:通过通信来共享内存,而不是通过共享内存来通信。