Go进阶高并发(多线程)处理教程

发布于:2025-07-27 ⋅ 阅读:(21) ⋅ 点赞:(0)

Go进阶高并发处理教程

目录

  1. Go并发编程基础
  2. Goroutine深入理解
  3. 同步原语详解
  4. 并发模式与最佳实践
  5. 性能优化技巧
  6. 实战案例

Go并发编程基础

什么是并发?

并发是指程序能够同时处理多个任务的能力。Go语言从设计之初就将并发作为核心特性,提供了简洁而强大的并发编程模型。

Go并发模型的优势

  • 轻量级协程:Goroutine比传统线程更轻量
  • CSP模型:通过通信来共享内存,而不是通过共享内存来通信
  • 内置调度器:Go运行时自动管理goroutine的调度

Goroutine深入理解

创建和启动Goroutine

package main

import (
    "fmt"
    "time"
)

func worker(id int) {
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    // 启动多个goroutine
    for i := 1; i <= 5; i++ {
        go worker(i)
    }
    
    // 等待所有goroutine完成
    time.Sleep(2 * time.Second)
    fmt.Println("All workers completed")
}

Goroutine的生命周期

  1. 创建:使用go关键字创建
  2. 调度:由Go调度器管理
  3. 执行:在可用的OS线程上执行
  4. 结束:函数返回时自动结束

调度器工作原理

Go使用M:N调度模型:

  • M:OS线程(Machine)
  • P:处理器(Processor)
  • G:Goroutine
G1  G2  G3  G4
 \   |   |  /
  \  |   | /
   \ |   |/
    \|   |
     P1  P2
     |   |
     M1  M2

同步原语详解

sync.WaitGroup

用于等待一组goroutine完成:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 完成时调用Done()
    
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1) // 增加等待计数
        go worker(i, &wg)
    }
    
    wg.Wait() // 等待所有goroutine完成
    fmt.Println("All workers completed")
}

sync.Mutex

互斥锁用于保护共享资源:

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    // 启动100个goroutine同时增加计数器
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter.Increment()
            }
        }()
    }
    
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.Value())
}

sync.RWMutex

读写锁允许多个读操作同时进行:

type SafeMap struct {
    mu sync.RWMutex
    data map[string]int
}

func (sm *SafeMap) Get(key string) (int, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    val, ok := sm.data[key]
    return val, ok
}

func (sm *SafeMap) Set(key string, value int) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.data[key] = value
}

sync.Once

确保某个操作只执行一次:

package main

import (
    "fmt"
    "sync"
)

var once sync.Once
var instance *Singleton

type Singleton struct {
    data string
}

func GetInstance() *Singleton {
    once.Do(func() {
        fmt.Println("Creating singleton instance")
        instance = &Singleton{data: "singleton"}
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            s := GetInstance()
            fmt.Printf("Goroutine %d got instance: %s\n", id, s.data)
        }(i)
    }
    
    wg.Wait()
}

并发模式与最佳实践

Worker Pool模式

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Data string
}

type Result struct {
    Job    Job
    Output string
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job.ID)
        time.Sleep(time.Millisecond * 100) // 模拟工作
        
        result := Result{
            Job:    job,
            Output: fmt.Sprintf("Processed by worker %d", id),
        }
        results <- result
    }
}

func main() {
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动worker
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }
    
    // 发送任务
    for i := 1; i <= numJobs; i++ {
        jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}
    }
    close(jobs)
    
    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("Job %d result: %s\n", result.Job.ID, result.Output)
    }
}

扇入扇出模式

// 扇出:将工作分发给多个goroutine
func fanOut(input <-chan int, workers int) []<-chan int {
    outputs := make([]<-chan int, workers)
    
    for i := 0; i < workers; i++ {
        output := make(chan int)
        outputs[i] = output
        
        go func(out chan<- int) {
            defer close(out)
            for n := range input {
                out <- n * n // 计算平方
            }
        }(output)
    }
    
    return outputs
}

// 扇入:将多个channel的结果合并
func fanIn(inputs ...<-chan int) <-chan int {
    output := make(chan int)
    var wg sync.WaitGroup
    
    for _, input := range inputs {
        wg.Add(1)
        go func(in <-chan int) {
            defer wg.Done()
            for n := range in {
                output <- n
            }
        }(input)
    }
    
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

性能优化技巧

1. 合理设置GOMAXPROCS

import "runtime"

func init() {
    // 设置使用的CPU核心数
    runtime.GOMAXPROCS(runtime.NumCPU())
}

2. 避免goroutine泄漏

// 错误示例:可能导致goroutine泄漏
func badExample() {
    ch := make(chan int)
    go func() {
        ch <- 1 // 如果没有接收者,这个goroutine会永远阻塞
    }()
    // 函数返回,但goroutine仍在运行
}

// 正确示例:使用context控制goroutine生命周期
func goodExample(ctx context.Context) {
    ch := make(chan int, 1) // 使用缓冲channel
    go func() {
        select {
        case ch <- 1:
        case <-ctx.Done():
            return
        }
    }()
}

3. 使用对象池减少GC压力

import "sync"

var pool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 1024)
    },
}

func processData(data []byte) {
    buf := pool.Get().([]byte)
    defer pool.Put(buf)
    
    // 使用buf处理数据
}

实战案例

并发HTTP客户端

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

type Result struct {
    URL        string
    StatusCode int
    Duration   time.Duration
    Error      error
}

func fetchURL(url string, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    start := time.Now()
    resp, err := http.Get(url)
    duration := time.Since(start)
    
    result := Result{
        URL:      url,
        Duration: duration,
        Error:    err,
    }
    
    if err == nil {
        result.StatusCode = resp.StatusCode
        resp.Body.Close()
    }
    
    results <- result
}

func main() {
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.stackoverflow.com",
        "https://www.golang.org",
    }
    
    results := make(chan Result, len(urls))
    var wg sync.WaitGroup
    
    // 并发请求所有URL
    for _, url := range urls {
        wg.Add(1)
        go fetchURL(url, results, &wg)
    }
    
    // 等待所有请求完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 处理结果
    for result := range results {
        if result.Error != nil {
            fmt.Printf("Error fetching %s: %v\n", result.URL, result.Error)
        } else {
            fmt.Printf("%s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)
        }
    }
}

总结

Go语言的并发编程提供了强大而简洁的工具:

  1. Goroutine:轻量级协程,易于创建和管理
  2. Channel:类型安全的通信机制
  3. sync包:提供各种同步原语
  4. 并发模式:Worker Pool、扇入扇出等经典模式

掌握这些概念和技巧,能够帮助您构建高性能、可扩展的并发应用程序。记住Go的并发哲学:通过通信来共享内存,而不是通过共享内存来通信

参考资源


网站公告

今日签到

点亮在社区的每一天
去签到