go 语言以高并发著称。那么在实际的项目中 经常会用到锁的情况。比如说秒杀抢购等等场景。下面主要介绍 redis 布式锁实现的两种高并发抢购场景。其实 高并发 和 分布式锁 是一个互斥的两个状态:
方式一 setNX:
使用 redis自带的API setNX 来实现。能解决高并发场景下的 绝大多数场景,待优化点 锁的续命 和 等待锁 的实现。实现流程:
- redis setNX 设置键值。如果 键存在则返回 false 反之则为 true
- 使用 setNX 来设置一个键值,值为当前协程设置的随机值。
- 当程序运行完成之后, 删除该键值
这里只有当减库存成功:
抢购流程成功 则返回 410。其余失败则返回 200. 这样就能通过返回码 很容易看到成功抢购的数量 我么使用 postman 模拟 1600 用户点击 十分钟。库存为 一个亿。
// redis分布式锁 方式1:自己动手 // 该方案可以解决大多数场景中的 redis 锁的问题, // 还剩余一个 锁续命的问题 极高并发下的微小概率事件 func redisLock_0(c *gin.Context) { // 实现逻辑 // 1 先用商品ID为 key, uuid为值, 这一步是防止别人把自己的锁删除 // 2 用SetNX 设置一个键值 锁住一个商品,并设置超时时间。 当 SetNX key 存在则 返回false, 反之为 true rdb := Rdb() lockKey := "product_001" newUUID := uuid.New() // 只能删除锁 并切判断是不是自己的锁,只有自己的锁才会删除 defer func() { keyValue, err := rdb.Get(ctx, lockKey).Result() if err != nil { fmt.Println("keyValue error:", keyValue, err) c.JSON(http.StatusOK, gin.H{ "message": "获取锁失败", }) return } if keyValue == newUUID.String() { rdb.Del(ctx, lockKey) } }() //设置锁,30秒过期,只有当锁不存在时才会成功设置, //设置时间是为了 防止特殊情况所没有成功释放。 success, err := rdb.SetNX(ctx, lockKey, newUUID.String(), time.Second*30).Result() if err != nil { fmt.Println("Error setting lock: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "设置锁单出错", }) return } // 判断是否成功获得锁 if success { fmt.Println("Successfully acquired lock:", newUUID) // 执行需要锁保护的操作 获取真实的 库存 count, err := strconv.Atoi(rdb.Get(ctx, "product_count").Val()) if err != nil { fmt.Println("Error getting product count: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "Error getting product count", }) return } if count > 1 { stock := count - 1 err := rdb.Set(ctx, "product_count", strconv.Itoa(stock), 0).Err() if err != nil { fmt.Println("Error setting product count: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "Error setting product count", }) return } else { fmt.Println("减库存操作成功, 现在库存为: %v", stock) c.JSON(http.StatusGone, gin.H{ "message": "Hello, World!", }) return } } else { fmt.Println("库存为 0 ") c.JSON(http.StatusOK, gin.H{ "message": "Hello, World!", }) return } } else { ///没有获得锁! 可以做延迟 轮询处理 fmt.Println("Failed to acquire lock. The key already exists.") c.JSON(http.StatusOK, gin.H{ "message": "Hello, World!", }) return } }
经过十分钟我们看下数据:
该方案整体数据:
- 一共请求了 534,979 次
- 并发 877
- 成功销售 280,367 个商品 即返回值为 410的个数。
方式二 redisson:
使用 go-redisson 库,这个 类似 java redisson:
该方案使用起来就很简单了:
我们来测试一样的数据:
func redisLock_1(c *gin.Context) { //获取一个锁对象 mutex := RedSon().NewMutex("godisson") //尝试加锁, 并且设置超时时间和等待时间, //如果加锁失败 会阻塞等待,或超时 或 加锁成功 err := mutex.TryLock(20000, 20000) if err != nil { log.Println("can't obtained lock") c.JSON(http.StatusOK, gin.H{ "message": "Error can't obtained lock", }) return } defer func(mutex *godisson.Mutex) { _, err := mutex.Unlock() if err != nil { log.Println("can't obtained lock") c.JSON(http.StatusOK, gin.H{ "message": "Error1 can't obtained lock", }) } }(mutex) // 执行需要锁保护的操作 获取真实的 库存 count, err := strconv.Atoi(rdb.Get(ctx, "product_count").Val()) if err != nil { fmt.Println("Error getting product count: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "Error getting product count", }) return } if count > 1 { stock := count - 1 err := rdb.Set(ctx, "product_count", strconv.Itoa(stock), 0).Err() if err != nil { fmt.Println("Error setting product count: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "Error setting product count", }) return } else { fmt.Println("减库存操作成功, 现在库存为: %v", stock) c.JSON(http.StatusGone, gin.H{ "message": "Hello, World!", }) return } } else { fmt.Println("库存为 0 ") c.JSON(http.StatusOK, gin.H{ "message": "Hello, World!", }) return } }
该方案整体数据:
- 一共请求 528,686
- 并发 868
- 成功销售 343,381 个商品 即返回值为 410的个数。应该是实现了锁等待。所有这个方案比自己实现的抢购 要高。
如何提高吞吐 优化性能问题
分段锁:
分段锁的核心思路就是:之前的方案都是一个锁,处理所有请求。这里呢 开十把锁。那吞吐性能不就 快了 十倍了麽。那么我们就采用redisson 来做十把分段锁:
把一个亿的商品库存,分成1千万的 十份。然后用 十把锁。这样:
func redisLock_2(c *gin.Context) { rand.Seed(time.Now().UnixNano()) // 生成包含0和9的随机数 num := rand.Intn(10) mutexKey := "godisson_" + strconv.Itoa(num) product_key := "product_count_" + strconv.Itoa(num) //获取一个锁对象 mutex := RedSon().NewMutex(mutexKey) //尝试加锁, 并且设置超时时间和等待时间, //如果加锁失败 会阻塞等待,或超时 或 加锁成功 err := mutex.TryLock(20000, 20000) if err != nil { log.Println("can't obtained lock") c.JSON(http.StatusOK, gin.H{ "message": "Error can't obtained lock", }) return } defer func(mutex *godisson.Mutex) { _, err := mutex.Unlock() if err != nil { log.Println("can't obtained lock") c.JSON(http.StatusOK, gin.H{ "message": "Error1 can't obtained lock", }) } }(mutex) // 执行需要锁保护的操作 获取真实的 库存 count, err := strconv.Atoi(rdb.Get(ctx, product_key).Val()) if err != nil { fmt.Println("Error getting product count: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "Error getting product count", }) return } if count > 1 { stock := count - 1 err := rdb.Set(ctx, product_key, strconv.Itoa(stock), 0).Err() if err != nil { fmt.Println("Error setting product count: %v", err) c.JSON(http.StatusOK, gin.H{ "message": "Error setting product count", }) return } else { fmt.Println("减库存操作成功, 现在库存为: %v", stock) c.JSON(http.StatusGone, gin.H{ "message": "Hello, World!", }) return } } else { fmt.Println("库存为 0 ") c.JSON(http.StatusOK, gin.H{ "message": "Hello, World!", }) return } }
无超卖情况:
测试结果如下:
- 一共请求 523,418
- 并发 858
- 成功销售 404,238 个商品 即返回值为 410的个数
如此看,不知道是我 单台机器性能跑满了测试不准确还是其他原因。并没有十倍的性能提升。