go routine 并发和同步

发布于:2025-03-07 ⋅ 阅读:(12) ⋅ 点赞:(0)

sync.WaitGroup

sync.WaitGroup 是 Go 语言中的一个并发原语,用于等待一组协程(goroutine)完成。在使用多个协程时,WaitGroup 可以帮助你确保所有协程都执行完毕后再继续执行主程序。以下是 sync.WaitGroup 的基本用法和示例:
下面是一个简单的示例,展示如何使用 sync.WaitGroup 来等待多个协程完成:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 在函数退出时调用 Done

    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Second) // 模拟工作
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    const numWorkers = 5
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1) // 启动一个新协程前增加计数
        go worker(i, &wg)
    }

    wg.Wait() // 等待所有协程完成
    fmt.Println("All workers done")
}

注意事项

  • 正确使用 Add 和 Done:确保在启动协程之前调用 Add,并在协程结束时调用 Done,否则可能导致死锁或计数错误。
  • 传递指针:WaitGroup 通常作为指针传递给协程,以避免复制并确保对同一个 WaitGroup 实例进行操作。
  • 不要复制 WaitGroup:WaitGroup 不应该被复制,因为它内部包含一个计数器,复制会导致计数器不一致。

sync.Mutex

sync.Mutex 是 Go 语言中的一种互斥锁,用于在多协程环境中保护共享资源,防止数据竞争。当多个协程需要访问同一个共享变量时,可以使用 Mutex 来确保同时只有一个协程能够访问该变量。

下面是一个简单的示例,展示如何使用 sync.Mutex 来保护对共享变量的访问:
package main

import (
    "fmt"
    "sync"
    "time"
)

var (
    counter int
    mu      sync.Mutex
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < 5; i++ {
        mu.Lock() // 加锁
        counter++
        fmt.Printf("Worker %d incremented counter to %d\n", id, counter)
        mu.Unlock() // 解锁
        time.Sleep(time.Millisecond * 100) // 模拟工作
    }
}

func main() {
    var wg sync.WaitGroup

    const numWorkers = 3
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter)
}

sync.Cond

sync.Cond 是 Go 语言中的一个同步原语,用于在 goroutine 之间协调执行顺序。它提供了一种 goroutine 可以等待某个条件满足的机制,并在条件满足时通知等待的 goroutine。

以下是一个简单的例子,展示了如何使用 sync.Cond 来实现生产者-消费者模型:

package main

import (
	"fmt"
	"sync"
	"time"
)

type Queue struct {
	items []int
	cond  *sync.Cond
}

func NewQueue() *Queue {
	return &Queue{
		items: make([]int, 0),
		cond:  sync.NewCond(&sync.Mutex{}),
	}
}

func (q *Queue) Enqueue(item int) {
	q.cond.L.Lock()
	q.items = append(q.items, item)
	fmt.Printf("Produced: %d\n", item)
	q.cond.L.Unlock()
	q.cond.Signal() // Notify one waiting goroutine that an item has been added
}

func (q *Queue) Dequeue() int {
	q.cond.L.Lock()
	for len(q.items) == 0 {
		q.cond.Wait() // Wait until there is an item to consume
	}
	item := q.items[0]
	q.items = q.items[1:]
	q.cond.L.Unlock()
	fmt.Printf("Consumed: %d\n", item)
	return item
}

func main() {
	queue := NewQueue()

	// Producer
	go func() {
		for i := 0; i < 5; i++ {
			queue.Enqueue(i)
			time.Sleep(time.Second)
		}
	}()

	// Consumer
	go func() {
		for i := 0; i < 5; i++ {
			queue.Dequeue()
			time.Sleep(2 * time.Second)
		}
	}()

	// Wait for goroutines to finish
	time.Sleep(10 * time.Second)
}


代码说明:
Queue 结构体:包含一个整数切片 items 和一个 sync.Cond。sync.Cond 用于在队列为空时阻塞消费者 goroutine。
NewQueue 函数:初始化一个 Queue 实例,使用 sync.NewCond 创建一个新的条件变量,绑定一个 sync.Mutex。
Enqueue 方法:
    加锁后将新项添加到队列,并使用 Signal 方法通知一个等待的 goroutine 有新数据可用。
    Signal 方法只唤醒一个等待的 goroutine。如果需要唤醒所有等待的 goroutine,可以使用 Broadcast 
Dequeue 方法:
    加锁后检查队列是否为空,如果为空则调用 Wait 方法阻塞当前 goroutine,直到被 Signal 或 Broadcast 唤醒。
    在被唤醒后,继续执行,取出队列的第一个
生产者和消费者 goroutine:
    生产者定期向队列中添加数据。
    消费者从队列中取数据并
通过这种方式,sync.Cond 可以有效地协调多个 goroutine 的执行顺序,确保生产者和消费者之间的正确同步。

errgroup

errgroup 是 Go 语言标准库中的一个包,提供了一种方便的方式来管理一组并发任务,并收集它们中的第一个错误。它通常用于需要并发执行多个任务并等待所有任务完成的场景。

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)

func main() {
	// 创建一个带有上下文的 errgroup
	g, ctx := errgroup.WithContext(context.Background())

	// 模拟的任务列表
	tasks := []int{1, 2, 3, 4, 5}

	// 为每个任务启动一个 goroutine
	for _, task := range tasks {
		// 捕获 task 变量
		task := task

		g.Go(func() error {
			// 模拟任务执行
			fmt.Printf("Starting task %d\n", task)
			time.Sleep(time.Duration(task) * time.Second)

			// 模拟一个错误发生在任务 3
			if task == 3 {
				return errors.New("error in task 3")
			}

			fmt.Printf("Completed task %d\n", task)
			return nil
		})
	}

	// 等待所有任务完成
	if err := g.Wait(); err != nil {
		fmt.Printf("Encountered error: %v\n", err)
	} else {
		fmt.Println("All tasks completed successfully")
	}
}

golang type optionFunc func(*config) 的用法

在 Go 语言中,type optionFunc func(*config) 定义了一种函数类型 optionFunc,它是一个函数签名,接受一个指向 config 结构体的指针作为参数,并且没有返回值。这个模式通常用于实现可选配置(option pattern),使得代码在创建对象时可以灵活地应用不同的配置选项。

这种模式通常用于构建器模式(builder pattern)或函数选项模式(functional options pattern),以便为对象提供可选的配置参数,而不是通过大量的构造函数或初始化函数来处理各种配置组合。

package main

import (
    "fmt"
)

// 定义一个配置结构体
type config struct {
    host string
    port int
}

// 定义一个函数类型,用于修改配置
type optionFunc func(*config)

// 应用选项函数以修改配置
func newConfig(opts ...optionFunc) *config {
    cfg := &config{
        host: "localhost", // 默认值
        port: 8080,        // 默认值
    }
    for _, opt := range opts {
        opt(cfg)
    }
    return cfg
}

// 定义具体的选项函数
func withHost(host string) optionFunc {
    return func(cfg *config) {
        cfg.host = host
    }
}

func withPort(port int) optionFunc {
    return func(cfg *config) {
        cfg.port = port
    }
}

func main() {
    // 创建一个配置对象,使用默认值
    defaultConfig := newConfig()
    fmt.Printf("Default config: host=%s, port=%d\n", defaultConfig.host, defaultConfig.port)

    // 创建一个配置对象,使用自定义选项
    customConfig := newConfig(
        withHost("example.com"),
        withPort(9090),
    )
    fmt.Printf("Custom config: host=%s, port=%d\n", customConfig.host, customConfig.port)
}
代码说明
配置结构体: config 结构体包含两个字段 host 和 port,用来存储配置数据。、
选项函数类型: optionFunc 是一个函数类型,定义了接受一个 *config 类型的指针参数的函数。
创建配置函数: newConfig 函数接受可变参数 opts ...optionFunc,这些参数用于修改默认配置。
选项函数: withHost 和 withPort 是两个具体的选项函数,返回一个 optionFunc 类型的函数,用于修改配置中的 host 和 port。
使用示例: 在 main 函数中,演示了如何使用默认配置和自定义选项来创建配置对象。

这种模式的优点是可以灵活地添加新的配置选项,而不需要修改现有的代码结构,使得代码更加可维护和可扩展。