golang 同步与锁

发布于:2025-04-02 ⋅ 阅读:(20) ⋅ 点赞:(0)

目录

一、sync包简介

二、Mutex(互斥锁)

功能特点

常见用法

示例代码

三、RWMutex(读写互斥锁)

功能特点

常见用法

示例代码

四、WaitGroup(等待组)

功能特点

常见用法

示例代码

五、Cond(条件变量)

功能特点

常见用法

示例代码

六、Map(同步的并发安全的map)

功能特点

常见用法

示例代码

七、Pool(对象池)

功能特点

常见用法

示例代码

八、总结

综合应用示例:在线商店系统

功能概述

代码实现

代码解释

运行结果示例


Go语言提供了多种同步机制和锁,用于控制多个goroutine之间的协作和共享资源的访问,确保数据的安全和一致性。以下是对Go语言中sync包中的主要同步原语和锁的详细讲解,以及完整的代码示例。


一、sync包简介

sync包提供了基本的同步原语,包括:

  • Mutex:互斥锁,用于保护临界区,防止多个goroutine同时访问共享资源。
  • RWMutex:读写互斥锁,允许多个读者同时访问共享资源,而写者必须独占。
  • WaitGroup:用于等待一组goroutine完成。
  • Cond:条件变量,用于在goroutine之间协调事件发生的顺序。
  • Map:一个并发安全的map,适合频繁的读写场景。
  • Pool:对象池,用于管理一组可重用的对象,减少内存分配和垃圾回收的开销。

二、Mutex(互斥锁)

功能特点
  • 互斥性:同时只能有一个goroutine持有Mutex。
  • 阻塞与等待:如果Mutex已被锁定,尝试获取Mutex的goroutine将被阻塞,直到Mutex被释放。
  • 手动控制:需要显式调用Lock()Unlock()方法来获取和释放锁。
常见用法
  • 保护共享变量:防止多个goroutine同时修改共享变量导致数据竞态。
  • 控制对资源的并发访问:如文件句柄、网络连接等。
示例代码
package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

var (  
    mu      sync.Mutex  
    counter int  
)  

func increment() {  
    for i := 0; i < 5; i++ {  
        // 获取Mutex  
        mu.Lock()  
        // 保证只有一个goroutine可以执行这段代码  
        fmt.Printf("goroutine %d: counter = %d\n", i, counter)  
        counter++  
        // 释放Mutex  
        mu.Unlock()  

        // 模拟其他任务  
        time.Sleep(100 * time.Millisecond)  
    }  
}  

func main() {  
    var wg sync.WaitGroup  

    // 启动5个goroutine  
    for i := 0; i < 5; i++ {  
        wg.Add(1)  
        go func(i int) {  
            defer wg.Done()  
            increment()  
        }(i)  
    }  

    // 等待所有goroutine完成  
    wg.Wait()  
    fmt.Println("Final counter:", counter)  
}  

三、RWMutex(读写互斥锁)

功能特点
  • 读优先:允许多个读者同时访问共享资源。
  • 写独占:当有写者需要修改共享资源时,必须等待所有读者完成后才能获得锁。
  • 高效并发:适用于读多写少的场景,提升性能。
常见用法
  • 保护读多写少的共享数据:如配置信息、缓存等。
  • 减少写操作对读操作的影响
示例代码
package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

var (  
    mu    sync.RWMutex  
    cache = map[string]string{"key": "value"}  
)  

func readData(key string) {  
    mu.RLock()       // 获取读锁  
    defer mu.RUnlock() // 确保锁能被释放  
    fmt.Printf("Reading %s: value = %s\n", key, cache[key])  
    time.Sleep(500 * time.Millisecond)  
}  

func writeData(key, value string) {  
    mu.Lock()      // 获取写锁  
    defer mu.Unlock() // 确保锁能被释放  
    cache[key] = value  
    fmt.Printf("Updating %s to %s\n", key, value)  
    time.Sleep(500 * time.Millisecond)  
}  

func main() {  
    var wg sync.WaitGroup  

    // 启动5个读者  
    for i := 0; i < 5; i++ {  
        wg.Add(1)  
        go func(i int) {  
            defer wg.Done()  
            readData(fmt.Sprintf("key-%d", i))  
        }(i)  
    }  

    // 启动一个写者  
    go func() {  
        writeData("commonKey", "newValue")  
    }()  

    // 等待所有goroutine完成  
    wg.Wait()  
    fmt.Println("Final cache:", cache)  
}  

四、WaitGroup(等待组)

功能特点
  • 等待多个goroutine完成:用于父goroutine等待子goroutine全部完成。
  • 安全地管理goroutine的完成状态
常见用法
  • 协调多个任务:确保所有的子任务完成后再继续执行主任务。
  • 父goroutine等待子goroutine:避免主程序过早退出。
示例代码
package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

func task(id int) {  
    defer fmt.Printf("Task %d completed\n", id)  
    time.Sleep(2 * time.Second)  
    fmt.Printf("Task %d is running\n", id)  
}  

func main() {  
    var wg sync.WaitGroup  

    fmt.Println("Starting tasks...")  
    for i := 1; i <= 3; i++ {  
        wg.Add(1)  
        go task(i)  
    }  

    fmt.Println("Waiting for tasks to complete...")  
    wg.Wait()  
    fmt.Println("All tasks completed!")  
}  

五、Cond(条件变量)

功能特点
  • 协调goroutine之间的事件发生顺序
  • 类似于其他语言中的条件变量,用于在线程之间协调事件发生的顺序
常见用法
  • 等待特定条件满足:如等待某个事件发生。
  • 在生产者-消费者模型中:用来协调生产者和消费者之间的数据传递。
示例代码
package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

var cond *sync.Cond  

func producer() {  
    for i := 0; i < 5; i++ {  
        fmt.Println("Producing", i)  
        cond.Broadcast() // 通知所有等待的goroutine  
        time.Sleep(500 * time.Millisecond)  
    }  
    cond.Broadcast()  
}  

func consumer(id int) {  
    for {  
        fmt.Printf("Consumer %d waiting...\n", id)  
        cond.Wait()   // 等待条件被满足  
        fmt.Printf("Consumer %d received\n", id)  
    }  
}  

func main() {  
    var mu sync.Mutex  
    cond = sync.NewCond(&mu)  

    var wg sync.WaitGroup  

    // 启动5个消费者  
    for i := 1; i <= 5; i++ {  
        wg.Add(1)  
        go func(id int) {  
            defer wg.Done()  
            consumer(id)  
        }(i)  
    }  

    // 启动生产者  
    go producer()  

    // 等待所有消费者完成  
    wg.Wait()  
    fmt.Println("All consumers completed!")  
}  

六、Map(同步的并发安全的map)

功能特点
  • 并发安全:适用于多个goroutine同时读写的场景。
  • 高效:比使用Mutex保护一个map更高效。
  • 便利:不需要自己加锁解锁,使用起来更为方便。
常见用法
  • 高频率的读写操作:适用于需要频繁访问的数据结构。
  • 多goroutine同时操作共享的键值对
示例代码
package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

var safeMap = &sync.Map{}  

func main() {  
    var wg sync.WaitGroup  

    // 写入数据  
    for i := 0; i < 10; i++ {  
        wg.Add(1)  
        go func(i int) {  
            defer wg.Done()  
            key := fmt.Sprintf("key%d", i)  
            safeMap.Store(key, i)  
            fmt.Printf("Stored %s -> %d\n", key, i)  
            time.Sleep(100 * time.Millisecond)  
        }(i)  
    }  

    // 读取数据  
    for i := 0; i < 5; i++ {  
        wg.Add(1)  
        go func(i int) {  
            defer wg.Done()  
            key := fmt.Sprintf("key%d", i)  
            value, ok := safeMap.Load(key)  
            if ok {  
                fmt.Printf("Retrieved %s -> %d\n", key, value)  
            } else {  
                fmt.Printf("Key %s not found\n", key)  
            }  
            time.Sleep(100 * time.Millisecond)  
        }(i)  
    }  

    // 等待所有goroutine完成  
    fmt.Println("Waiting for all goroutines to complete...")  
    wg.Wait()  
}  

七、Pool(对象池)

功能特点
  • 对象复用:减少对象的创建和销毁,提升性能。
  • 资源管理:适用于需要频繁获取和释放资源的场景。
常见用法
  • 数据库连接池:管理多个连接,提高数据库访问的效率。
  • 内存分配优化:减少垃圾回收的开销。
示例代码
package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

type ReusableObject struct {  
    id int  
}  

var pool *sync.Pool  

func init() {  
    pool = &sync.Pool{  
        New: func() interface{} {  
            return &ReusableObject{  
                id: 0,  
            }  
        },  
    }  
}  

func worker(id int) {  
    obj := pool.Get().(*ReusableObject)  
    obj.id = id  
    fmt.Printf("Worker %d is using object with id %d\n", id, obj.id)  
    time.Sleep(500 * time.Millisecond)  
    pool.Put(obj)  
}  

func main() {  
    var wg sync.WaitGroup  

    for i := 1; i <= 5; i++ {  
        wg.Add(1)  
        go func(id int) {  
            defer wg.Done()  
            worker(id)  
        }(i)  
    }  

    fmt.Println("Waiting for all workers to complete...")  
    wg.Wait()  
}  

八、总结

Go语言通过sync包提供了丰富的同步原语和锁,帮助开发者在并发编程中确保数据的安全和一致性。通过理解和掌握这些工具,可以有效避免数据竞态和死锁等问题,编写出高效、可靠的并发程序。

选择合适的同步机制:

  • Mutex:适用于多个goroutine竞争独占资源的场景。
  • RWMutex:适用于读多写少的场景,提升并发性能。
  • WaitGroup:用于等待多个goroutine完成,协调程序流程。
  • Cond:用于在线程之间协调特定事件的发生顺序。
  • Map:适用于频繁的并发读写场景,高效且安全。
  • Pool:用于对象的复用,优化内存分配和管理。

综合应用示例:在线商店系统

以下是一个综合应用示例,模拟一个简单的在线商店系统,展示了Go语言中sync包中各种同步机制的实际应用。这个系统包括产品目录管理、订单处理、用户会话管理等功能,涵盖了Mutex、RWMutex、Cond、Pool和sync.Map的使用。

功能概述

  1. 产品目录:维护一系列产品信息,支持读取和更新操作。
  2. 订单队列:管理客户的订单,确保订单的安全处理。
  3. 数据库连接池:优化数据库连接的使用,提升效率。
  4. 用户会话管理:维护用户的登录状态,确保会话的安全性。

代码实现

package main  

import (  
    "fmt"  
    "sync"  
    "time"  
)  

// Product 代表产品信息  
type Product struct {  
    ID    string  
    Name  string  
    Price float64  
    Stock int  
}  

// ProductCatalog 产品目录,使用RWMutex进行同步控制  
type ProductCatalog struct {  
    products map[string]Product  
    mu      sync.RWMutex  
}  

// NewProductCatalog 创建新的产品目录  
func NewProductCatalog() *ProductCatalog {  
    return &ProductCatalog{  
        products: make(map[string]Product),  
        mu:      sync.RWMutex{},  
    }  
}  

// AddProduct 添加产品到目录中  
func (c *ProductCatalog) AddProduct(p Product) {  
    c.mu.Lock()  
    defer c.mu.Unlock()  
    c.products[p.ID] = p  
    fmt.Printf("Product %s added successfully\n", p.Name)  
}  

// GetProduct 通过ID获取产品信息  
func (c *ProductCatalog) GetProduct(id string) (Product, error) {  
    c.mu.RLock()  
    defer c.mu.RUnlock()  
    p, ok := c.products[id]  
    if !ok {  
        return Product{}, fmt.Errorf("product %s not found", id)  
    }  
    return p, nil  
}  

// UpdateProduct 更新产品信息  
func (c *ProductCatalog) UpdateProduct(id string, name string, price float64) {  
    c.mu.Lock()  
    defer c.mu.Unlock()  
    if _, ok := c.products[id]; ok {  
        c.products[id] = Product{ID: id, Name: name, Price: price, Stock: c.products[id].Stock}  
        fmt.Printf("Product %s updated successfully\n", id)  
    } else {  
        fmt.Printf("Product %s not found\n", id)  
    }  
}  

// Order 代表客户订单  
type Order struct {  
    ID       string  
    ProductID string  
    Quantity  int  
    Status    string  
}  

// OrderQueue 订单队列,使用Mutex和Cond进行同步控制  
type OrderQueue struct {  
    orders []Order  
    mu     sync.Mutex  
    cond   *sync.Cond  
}  

// NewOrderQueue 创建新的订单队列  
func NewOrderQueue() *OrderQueue {  
    return &OrderQueue{  
        orders: make([]Order, 0),  
        mu:    sync.Mutex{},  
        cond:  sync.NewCond(&sync.Mutex{}),  
    }  
}  

// AddOrder 添加订单到队列  
func (q *OrderQueue) AddOrder(order Order) {  
    q.mu.Lock()  
    defer q.mu.Unlock()  
    q.orders = append(q.orders, order)  
    q.cond.Broadcast()  
    fmt.Printf("Order %s added to queue\n", order.ID)  
}  

// ProcessOrder 处理订单队列中的订单  
func (q *OrderQueue) ProcessOrder() {  
    for {  
        q.cond.L.Lock()  
        for len(q.orders) == 0 {  
            fmt.Println("No orders to process. Waiting...")  
            q.cond.L.Wait()  
        }  
        q.cond.L.Unlock()  

        q.mu.Lock()  
        order := q.orders[0]  
        q.orders = q.orders[1:]  
        q.mu.Unlock()  

        fmt.Printf("Processing order %s\n", order.ID)  
        time.Sleep(2 * time.Second)  
        order.Status = "completed"  
        fmt.Printf("Order %s processed successfully\n", order.ID)  
    }  
}  

// DatabaseConnection 数据库连接对象  
type DatabaseConnection struct {  
    ID   int  
    Free bool  
}  

// DatabasePool 数据库连接池,使用Pool进行管理  
type DatabasePool struct {  
    pool *sync.Pool  
}  

// NewDatabasePool 创建新的数据库连接池  
func NewDatabasePool(initialCount int) *DatabasePool {  
    return &DatabasePool{  
        pool: sync.Pool{  
            New: func() interface{} {  
                return &DatabaseConnection{ID: 0, Free: true}  
            },  
        },  
    }  
}  

// GetConnection 从池中获取一个可用的数据库连接  
func (d *DatabasePool) GetConnection() *DatabaseConnection {  
    conn := d.pool.Get().(*DatabaseConnection)  
    conn.Free = false  
    fmt.Printf("Get connection %d\n", conn.ID)  
    return conn  
}  

// ReturnConnection 将连接返回到池中  
func (d *DatabasePool) ReturnConnection(conn *DatabaseConnection) {  
    conn.Free = true  
    d.pool.Put(conn)  
    fmt.Printf("Return connection %d\n", conn.ID)  
}  

// UserSession 用户会话信息,使用sync.Map进行管理  
type UserSession struct {  
    userID string  
    data   map[string]interface{}  
}  

// NewUserSession 创建新的用户会话  
func NewUserSession(userID string) *UserSession {  
    return &UserSession{  
        userID: userID,  
        data:   make(map[string]interface{}),  
    }  
}  

// SessionManager 用户会话管理器,使用sync.Map进行管理  
type SessionManager struct {  
    sessions sync.Map  
}  

// NewSessionManager 创建新的会话管理器  
func NewSessionManager() *SessionManager {  
    return &SessionManager{  
        sessions: sync.Map{},  
    }  
}  

// AddSession 添加用户会话  
func (s *SessionManager) AddSession(session *UserSession) {  
    s.sessions.Store(session.userID, session)  
    fmt.Printf("Session added for user %s\n", session.userID)  
}  

// GetSession 获取用户会话  
func (s *SessionManager) GetSession(userID string) (*UserSession, bool) {  
    value, ok := s.sessions.Load(userID)  
    if ok {  
        return value.(*UserSession), true  
    }  
    return nil, false  
}  

func main() {  
    // 初始化组件  
    catalog := NewProductCatalog()  
    orderQueue := NewOrderQueue()  
    dbPool := NewDatabasePool(5)  
    sessionMgr := NewSessionManager()  

    // 添加测试产品到目录  
    testProduct := Product{ID: "P001", Name: "Test Product", Price: 19.99, Stock: 100}  
    catalog.AddProduct(testProduct)  

    // 测试获取产品信息  
    p, err := catalog.GetProduct("P001")  
    if err == nil {  
        fmt.Printf("Product info: %+v\n", p)  
    } else {  
        fmt.Println("Product not found")  
    }  

    // 模拟用户下单,添加订单  
    testOrder := Order{ID: "O001", ProductID: "P001", Quantity: 2, Status: "pending"}  
    orderQueue.AddOrder(testOrder)  

    // 启动订单处理goroutine  
    go orderQueue.ProcessOrder()  

    // 模拟获取数据库连接  
    conn := dbPool.GetConnection()  
    // 使用连接进行数据库操作  
    time.Sleep(1 * time.Second)  
    dbPool.ReturnConnection(conn)  

    // 模拟用户登录,创建并管理会话  
    session := NewUserSession("user123")  
    sessionMgr.AddSession(session)  
    retrievedSession, found := sessionMgr.GetSession("user123")  
    if found {  
        fmt.Printf("Session retrieved for user %s\n", retrievedSession.userID)  
    } else {  
        fmt.Printf("Session not found for user %s\n", "user123")  
    }  

    // 等待所有goroutine完成  
    fmt.Println("Main function completed")  
}  

代码解释

  1. ProductCatalog:使用RWMutex保护产品目录,确保读写操作的安全性。
  2. OrderQueue:使用Mutex和Cond协调订单的生成和处理,确保订单处理的正确性。
  3. DatabasePool:使用Pool管理数据库连接,优化资源的使用和释放。
  4. SessionManager:使用sync.Map管理用户会话,确保高并发下的安全访问。

运行结果示例

Product P001 added successfully  
Product info: {ID:P001 Name:Test Product Price:19.99 Stock:100}  
Order O001 added to queue  
Get connection 0  
Processing order O001  
Order O001 processed successfully  
Session added for user user123  
Session retrieved for user user123  
Main function completed