目录
Go语言提供了多种同步机制和锁,用于控制多个goroutine之间的协作和共享资源的访问,确保数据的安全和一致性。以下是对Go语言中sync
包中的主要同步原语和锁的详细讲解,以及完整的代码示例。
一、sync包简介
sync包提供了基本的同步原语,包括:
- Mutex:互斥锁,用于保护临界区,防止多个goroutine同时访问共享资源。
- RWMutex:读写互斥锁,允许多个读者同时访问共享资源,而写者必须独占。
- WaitGroup:用于等待一组goroutine完成。
- Cond:条件变量,用于在goroutine之间协调事件发生的顺序。
- Map:一个并发安全的map,适合频繁的读写场景。
- Pool:对象池,用于管理一组可重用的对象,减少内存分配和垃圾回收的开销。
二、Mutex(互斥锁)
功能特点
- 互斥性:同时只能有一个goroutine持有Mutex。
- 阻塞与等待:如果Mutex已被锁定,尝试获取Mutex的goroutine将被阻塞,直到Mutex被释放。
- 手动控制:需要显式调用
Lock()
和Unlock()
方法来获取和释放锁。
常见用法
- 保护共享变量:防止多个goroutine同时修改共享变量导致数据竞态。
- 控制对资源的并发访问:如文件句柄、网络连接等。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
var (
mu sync.Mutex
counter int
)
func increment() {
for i := 0; i < 5; i++ {
// 获取Mutex
mu.Lock()
// 保证只有一个goroutine可以执行这段代码
fmt.Printf("goroutine %d: counter = %d\n", i, counter)
counter++
// 释放Mutex
mu.Unlock()
// 模拟其他任务
time.Sleep(100 * time.Millisecond)
}
}
func main() {
var wg sync.WaitGroup
// 启动5个goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
increment()
}(i)
}
// 等待所有goroutine完成
wg.Wait()
fmt.Println("Final counter:", counter)
}
三、RWMutex(读写互斥锁)
功能特点
- 读优先:允许多个读者同时访问共享资源。
- 写独占:当有写者需要修改共享资源时,必须等待所有读者完成后才能获得锁。
- 高效并发:适用于读多写少的场景,提升性能。
常见用法
- 保护读多写少的共享数据:如配置信息、缓存等。
- 减少写操作对读操作的影响。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
var (
mu sync.RWMutex
cache = map[string]string{"key": "value"}
)
func readData(key string) {
mu.RLock() // 获取读锁
defer mu.RUnlock() // 确保锁能被释放
fmt.Printf("Reading %s: value = %s\n", key, cache[key])
time.Sleep(500 * time.Millisecond)
}
func writeData(key, value string) {
mu.Lock() // 获取写锁
defer mu.Unlock() // 确保锁能被释放
cache[key] = value
fmt.Printf("Updating %s to %s\n", key, value)
time.Sleep(500 * time.Millisecond)
}
func main() {
var wg sync.WaitGroup
// 启动5个读者
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
readData(fmt.Sprintf("key-%d", i))
}(i)
}
// 启动一个写者
go func() {
writeData("commonKey", "newValue")
}()
// 等待所有goroutine完成
wg.Wait()
fmt.Println("Final cache:", cache)
}
四、WaitGroup(等待组)
功能特点
- 等待多个goroutine完成:用于父goroutine等待子goroutine全部完成。
- 安全地管理goroutine的完成状态。
常见用法
- 协调多个任务:确保所有的子任务完成后再继续执行主任务。
- 父goroutine等待子goroutine:避免主程序过早退出。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
func task(id int) {
defer fmt.Printf("Task %d completed\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Task %d is running\n", id)
}
func main() {
var wg sync.WaitGroup
fmt.Println("Starting tasks...")
for i := 1; i <= 3; i++ {
wg.Add(1)
go task(i)
}
fmt.Println("Waiting for tasks to complete...")
wg.Wait()
fmt.Println("All tasks completed!")
}
五、Cond(条件变量)
功能特点
- 协调goroutine之间的事件发生顺序。
- 类似于其他语言中的条件变量,用于在线程之间协调事件发生的顺序。
常见用法
- 等待特定条件满足:如等待某个事件发生。
- 在生产者-消费者模型中:用来协调生产者和消费者之间的数据传递。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
var cond *sync.Cond
func producer() {
for i := 0; i < 5; i++ {
fmt.Println("Producing", i)
cond.Broadcast() // 通知所有等待的goroutine
time.Sleep(500 * time.Millisecond)
}
cond.Broadcast()
}
func consumer(id int) {
for {
fmt.Printf("Consumer %d waiting...\n", id)
cond.Wait() // 等待条件被满足
fmt.Printf("Consumer %d received\n", id)
}
}
func main() {
var mu sync.Mutex
cond = sync.NewCond(&mu)
var wg sync.WaitGroup
// 启动5个消费者
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
consumer(id)
}(i)
}
// 启动生产者
go producer()
// 等待所有消费者完成
wg.Wait()
fmt.Println("All consumers completed!")
}
六、Map(同步的并发安全的map)
功能特点
- 并发安全:适用于多个goroutine同时读写的场景。
- 高效:比使用Mutex保护一个map更高效。
- 便利:不需要自己加锁解锁,使用起来更为方便。
常见用法
- 高频率的读写操作:适用于需要频繁访问的数据结构。
- 多goroutine同时操作共享的键值对。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
var safeMap = &sync.Map{}
func main() {
var wg sync.WaitGroup
// 写入数据
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
safeMap.Store(key, i)
fmt.Printf("Stored %s -> %d\n", key, i)
time.Sleep(100 * time.Millisecond)
}(i)
}
// 读取数据
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
value, ok := safeMap.Load(key)
if ok {
fmt.Printf("Retrieved %s -> %d\n", key, value)
} else {
fmt.Printf("Key %s not found\n", key)
}
time.Sleep(100 * time.Millisecond)
}(i)
}
// 等待所有goroutine完成
fmt.Println("Waiting for all goroutines to complete...")
wg.Wait()
}
七、Pool(对象池)
功能特点
- 对象复用:减少对象的创建和销毁,提升性能。
- 资源管理:适用于需要频繁获取和释放资源的场景。
常见用法
- 数据库连接池:管理多个连接,提高数据库访问的效率。
- 内存分配优化:减少垃圾回收的开销。
示例代码
package main
import (
"fmt"
"sync"
"time"
)
type ReusableObject struct {
id int
}
var pool *sync.Pool
func init() {
pool = &sync.Pool{
New: func() interface{} {
return &ReusableObject{
id: 0,
}
},
}
}
func worker(id int) {
obj := pool.Get().(*ReusableObject)
obj.id = id
fmt.Printf("Worker %d is using object with id %d\n", id, obj.id)
time.Sleep(500 * time.Millisecond)
pool.Put(obj)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id)
}(i)
}
fmt.Println("Waiting for all workers to complete...")
wg.Wait()
}
八、总结
Go语言通过sync
包提供了丰富的同步原语和锁,帮助开发者在并发编程中确保数据的安全和一致性。通过理解和掌握这些工具,可以有效避免数据竞态和死锁等问题,编写出高效、可靠的并发程序。
选择合适的同步机制:
- Mutex:适用于多个goroutine竞争独占资源的场景。
- RWMutex:适用于读多写少的场景,提升并发性能。
- WaitGroup:用于等待多个goroutine完成,协调程序流程。
- Cond:用于在线程之间协调特定事件的发生顺序。
- Map:适用于频繁的并发读写场景,高效且安全。
- Pool:用于对象的复用,优化内存分配和管理。
综合应用示例:在线商店系统
以下是一个综合应用示例,模拟一个简单的在线商店系统,展示了Go语言中sync
包中各种同步机制的实际应用。这个系统包括产品目录管理、订单处理、用户会话管理等功能,涵盖了Mutex、RWMutex、Cond、Pool和sync.Map的使用。
功能概述
- 产品目录:维护一系列产品信息,支持读取和更新操作。
- 订单队列:管理客户的订单,确保订单的安全处理。
- 数据库连接池:优化数据库连接的使用,提升效率。
- 用户会话管理:维护用户的登录状态,确保会话的安全性。
代码实现
package main
import (
"fmt"
"sync"
"time"
)
// Product 代表产品信息
type Product struct {
ID string
Name string
Price float64
Stock int
}
// ProductCatalog 产品目录,使用RWMutex进行同步控制
type ProductCatalog struct {
products map[string]Product
mu sync.RWMutex
}
// NewProductCatalog 创建新的产品目录
func NewProductCatalog() *ProductCatalog {
return &ProductCatalog{
products: make(map[string]Product),
mu: sync.RWMutex{},
}
}
// AddProduct 添加产品到目录中
func (c *ProductCatalog) AddProduct(p Product) {
c.mu.Lock()
defer c.mu.Unlock()
c.products[p.ID] = p
fmt.Printf("Product %s added successfully\n", p.Name)
}
// GetProduct 通过ID获取产品信息
func (c *ProductCatalog) GetProduct(id string) (Product, error) {
c.mu.RLock()
defer c.mu.RUnlock()
p, ok := c.products[id]
if !ok {
return Product{}, fmt.Errorf("product %s not found", id)
}
return p, nil
}
// UpdateProduct 更新产品信息
func (c *ProductCatalog) UpdateProduct(id string, name string, price float64) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.products[id]; ok {
c.products[id] = Product{ID: id, Name: name, Price: price, Stock: c.products[id].Stock}
fmt.Printf("Product %s updated successfully\n", id)
} else {
fmt.Printf("Product %s not found\n", id)
}
}
// Order 代表客户订单
type Order struct {
ID string
ProductID string
Quantity int
Status string
}
// OrderQueue 订单队列,使用Mutex和Cond进行同步控制
type OrderQueue struct {
orders []Order
mu sync.Mutex
cond *sync.Cond
}
// NewOrderQueue 创建新的订单队列
func NewOrderQueue() *OrderQueue {
return &OrderQueue{
orders: make([]Order, 0),
mu: sync.Mutex{},
cond: sync.NewCond(&sync.Mutex{}),
}
}
// AddOrder 添加订单到队列
func (q *OrderQueue) AddOrder(order Order) {
q.mu.Lock()
defer q.mu.Unlock()
q.orders = append(q.orders, order)
q.cond.Broadcast()
fmt.Printf("Order %s added to queue\n", order.ID)
}
// ProcessOrder 处理订单队列中的订单
func (q *OrderQueue) ProcessOrder() {
for {
q.cond.L.Lock()
for len(q.orders) == 0 {
fmt.Println("No orders to process. Waiting...")
q.cond.L.Wait()
}
q.cond.L.Unlock()
q.mu.Lock()
order := q.orders[0]
q.orders = q.orders[1:]
q.mu.Unlock()
fmt.Printf("Processing order %s\n", order.ID)
time.Sleep(2 * time.Second)
order.Status = "completed"
fmt.Printf("Order %s processed successfully\n", order.ID)
}
}
// DatabaseConnection 数据库连接对象
type DatabaseConnection struct {
ID int
Free bool
}
// DatabasePool 数据库连接池,使用Pool进行管理
type DatabasePool struct {
pool *sync.Pool
}
// NewDatabasePool 创建新的数据库连接池
func NewDatabasePool(initialCount int) *DatabasePool {
return &DatabasePool{
pool: sync.Pool{
New: func() interface{} {
return &DatabaseConnection{ID: 0, Free: true}
},
},
}
}
// GetConnection 从池中获取一个可用的数据库连接
func (d *DatabasePool) GetConnection() *DatabaseConnection {
conn := d.pool.Get().(*DatabaseConnection)
conn.Free = false
fmt.Printf("Get connection %d\n", conn.ID)
return conn
}
// ReturnConnection 将连接返回到池中
func (d *DatabasePool) ReturnConnection(conn *DatabaseConnection) {
conn.Free = true
d.pool.Put(conn)
fmt.Printf("Return connection %d\n", conn.ID)
}
// UserSession 用户会话信息,使用sync.Map进行管理
type UserSession struct {
userID string
data map[string]interface{}
}
// NewUserSession 创建新的用户会话
func NewUserSession(userID string) *UserSession {
return &UserSession{
userID: userID,
data: make(map[string]interface{}),
}
}
// SessionManager 用户会话管理器,使用sync.Map进行管理
type SessionManager struct {
sessions sync.Map
}
// NewSessionManager 创建新的会话管理器
func NewSessionManager() *SessionManager {
return &SessionManager{
sessions: sync.Map{},
}
}
// AddSession 添加用户会话
func (s *SessionManager) AddSession(session *UserSession) {
s.sessions.Store(session.userID, session)
fmt.Printf("Session added for user %s\n", session.userID)
}
// GetSession 获取用户会话
func (s *SessionManager) GetSession(userID string) (*UserSession, bool) {
value, ok := s.sessions.Load(userID)
if ok {
return value.(*UserSession), true
}
return nil, false
}
func main() {
// 初始化组件
catalog := NewProductCatalog()
orderQueue := NewOrderQueue()
dbPool := NewDatabasePool(5)
sessionMgr := NewSessionManager()
// 添加测试产品到目录
testProduct := Product{ID: "P001", Name: "Test Product", Price: 19.99, Stock: 100}
catalog.AddProduct(testProduct)
// 测试获取产品信息
p, err := catalog.GetProduct("P001")
if err == nil {
fmt.Printf("Product info: %+v\n", p)
} else {
fmt.Println("Product not found")
}
// 模拟用户下单,添加订单
testOrder := Order{ID: "O001", ProductID: "P001", Quantity: 2, Status: "pending"}
orderQueue.AddOrder(testOrder)
// 启动订单处理goroutine
go orderQueue.ProcessOrder()
// 模拟获取数据库连接
conn := dbPool.GetConnection()
// 使用连接进行数据库操作
time.Sleep(1 * time.Second)
dbPool.ReturnConnection(conn)
// 模拟用户登录,创建并管理会话
session := NewUserSession("user123")
sessionMgr.AddSession(session)
retrievedSession, found := sessionMgr.GetSession("user123")
if found {
fmt.Printf("Session retrieved for user %s\n", retrievedSession.userID)
} else {
fmt.Printf("Session not found for user %s\n", "user123")
}
// 等待所有goroutine完成
fmt.Println("Main function completed")
}
代码解释
- ProductCatalog:使用RWMutex保护产品目录,确保读写操作的安全性。
- OrderQueue:使用Mutex和Cond协调订单的生成和处理,确保订单处理的正确性。
- DatabasePool:使用Pool管理数据库连接,优化资源的使用和释放。
- SessionManager:使用sync.Map管理用户会话,确保高并发下的安全访问。
运行结果示例
Product P001 added successfully
Product info: {ID:P001 Name:Test Product Price:19.99 Stock:100}
Order O001 added to queue
Get connection 0
Processing order O001
Order O001 processed successfully
Session added for user user123
Session retrieved for user user123
Main function completed