go 分布式redis锁的实现方式

发布于:2025-03-04 ⋅ 阅读:(19) ⋅ 点赞:(0)

go 语言以高并发著称。那么在实际的项目中 经常会用到锁的情况。比如说秒杀抢购等等场景。下面主要介绍 redis 布式锁实现的两种高并发抢购场景。其实 高并发分布式锁 一个互斥的两个状态:

方式一 setNX:

使用 redis自带的API setNX 来实现。能解决高并发场景下的 绝大多数场景,待优化点 锁的续命 和 等待锁 的实现。实现流程:

  1. redis setNX 设置键值。如果 键存在则返回 false 反之则为 true
  2. 使用 setNX 来设置一个键值,值为当前协程设置的随机值。
  3. 当程序运行完成之后, 删除该键值
 这里只有当减库存成功

抢购流程成功 则返回 410其余失败则返回 200这样就能通过返回码  很容易看到成功抢购的数量 我么使用 postman 模拟 1600 用户点击 十分钟。库存为 一个亿。

// redis分布式锁  方式1:自己动手
// 该方案可以解决大多数场景中的 redis 锁的问题,
// 还剩余一个 锁续命的问题 极高并发下的微小概率事件
func redisLock_0(c *gin.Context) {
	// 实现逻辑
	// 1 先用商品ID为 key, uuid为值,  这一步是防止别人把自己的锁删除
	// 2 用SetNX 设置一个键值 锁住一个商品,并设置超时时间。 当 SetNX key 存在则 返回false, 反之为 true
	rdb := Rdb()
	lockKey := "product_001"
	newUUID := uuid.New()
	// 只能删除锁  并切判断是不是自己的锁,只有自己的锁才会删除
	defer func() {
		keyValue, err := rdb.Get(ctx, lockKey).Result()
		if err != nil {
			fmt.Println("keyValue error:", keyValue, err)
			c.JSON(http.StatusOK, gin.H{
				"message": "获取锁失败",
			})
			return
		}
		if keyValue == newUUID.String() {
			rdb.Del(ctx, lockKey)
		}
	}()

	//设置锁,30秒过期,只有当锁不存在时才会成功设置,
	//设置时间是为了 防止特殊情况所没有成功释放。
	success, err := rdb.SetNX(ctx, lockKey, newUUID.String(), time.Second*30).Result()
	if err != nil {
		fmt.Println("Error setting lock: %v", err)
		c.JSON(http.StatusOK, gin.H{
			"message": "设置锁单出错",
		})
		return
	}
	// 判断是否成功获得锁
	if success {
		fmt.Println("Successfully acquired lock:", newUUID)
		// 执行需要锁保护的操作 获取真实的 库存
		count, err := strconv.Atoi(rdb.Get(ctx, "product_count").Val())
		if err != nil {
			fmt.Println("Error getting product count: %v", err)
			c.JSON(http.StatusOK, gin.H{
				"message": "Error getting product count",
			})
			return
		}
		if count > 1 {
			stock := count - 1
			err := rdb.Set(ctx, "product_count", strconv.Itoa(stock), 0).Err()
			if err != nil {
				fmt.Println("Error setting product count: %v", err)
				c.JSON(http.StatusOK, gin.H{
					"message": "Error setting product count",
				})
				return
			} else {
				fmt.Println("减库存操作成功, 现在库存为: %v", stock)
				c.JSON(http.StatusGone, gin.H{
					"message": "Hello, World!",
				})
				return
			}
		} else {
			fmt.Println("库存为 0 ")
			c.JSON(http.StatusOK, gin.H{
				"message": "Hello, World!",
			})
			return
		}
	} else {
		///没有获得锁!  可以做延迟 轮询处理
		fmt.Println("Failed to acquire lock. The key already exists.")
		c.JSON(http.StatusOK, gin.H{
			"message": "Hello, World!",
		})
		return
	}
}

经过十分钟我们看下数据:

该方案整体数据:
  • 一共请求了 534,979 次
  • 并发 877
  • 成功销售 280,367 个商品 即返回值为 410的个数。

方式二 redisson:

使用  go-redisson 库,这个 类似 java redisson:

go-redisson command - github.com/paceew/go-redisson - Go Packageshttps://pkg.go.dev/github.com/paceew/go-redisson

该方案使用起来就很简单了:

我们来测试一样的数据:

func redisLock_1(c *gin.Context) {
	//获取一个锁对象
	mutex := RedSon().NewMutex("godisson")
	//尝试加锁, 并且设置超时时间和等待时间,
	//如果加锁失败 会阻塞等待,或超时 或 加锁成功
	err := mutex.TryLock(20000, 20000)
	if err != nil {
		log.Println("can't obtained lock")
		c.JSON(http.StatusOK, gin.H{
			"message": "Error can't obtained lock",
		})
		return
	}
	defer func(mutex *godisson.Mutex) {
		_, err := mutex.Unlock()
		if err != nil {
			log.Println("can't obtained lock")
			c.JSON(http.StatusOK, gin.H{
				"message": "Error1 can't obtained lock",
			})
		}
	}(mutex)

	// 执行需要锁保护的操作 获取真实的 库存
	count, err := strconv.Atoi(rdb.Get(ctx, "product_count").Val())
	if err != nil {
		fmt.Println("Error getting product count: %v", err)
		c.JSON(http.StatusOK, gin.H{
			"message": "Error getting product count",
		})
		return
	}
	if count > 1 {
		stock := count - 1
		err := rdb.Set(ctx, "product_count", strconv.Itoa(stock), 0).Err()
		if err != nil {
			fmt.Println("Error setting product count: %v", err)
			c.JSON(http.StatusOK, gin.H{
				"message": "Error setting product count",
			})
			return
		} else {
			fmt.Println("减库存操作成功, 现在库存为: %v", stock)
			c.JSON(http.StatusGone, gin.H{
				"message": "Hello, World!",
			})
			return
		}
	} else {
		fmt.Println("库存为 0 ")
		c.JSON(http.StatusOK, gin.H{
			"message": "Hello, World!",
		})
		return
	}
}

 

该方案整体数据:
  • 一共请求 528,686
  • 并发 868
  • 成功销售 343,381 个商品  即返回值为 410的个数。应该是实现了锁等待。所有这个方案比自己实现的抢购 要高。

如何提高吞吐 优化性能问题 

分段锁:

分段锁的核心思路就是:之前的方案都是一个锁,处理所有请求。这里呢 开十把锁。那吞吐性能不就 快了 十倍了麽。那么我们就采用redisson 来做十把分段锁:

把一个亿的商品库存,分成1千万的 十份。然后用 十把锁。这样:

func redisLock_2(c *gin.Context) {

	rand.Seed(time.Now().UnixNano())
	// 生成包含0和9的随机数
	num := rand.Intn(10)
	mutexKey := "godisson_" + strconv.Itoa(num)
	product_key := "product_count_" + strconv.Itoa(num)

	//获取一个锁对象
	mutex := RedSon().NewMutex(mutexKey)
	//尝试加锁, 并且设置超时时间和等待时间,
	//如果加锁失败 会阻塞等待,或超时 或 加锁成功
	err := mutex.TryLock(20000, 20000)
	if err != nil {
		log.Println("can't obtained lock")
		c.JSON(http.StatusOK, gin.H{
			"message": "Error can't obtained lock",
		})
		return
	}
	defer func(mutex *godisson.Mutex) {
		_, err := mutex.Unlock()
		if err != nil {
			log.Println("can't obtained lock")
			c.JSON(http.StatusOK, gin.H{
				"message": "Error1 can't obtained lock",
			})
		}
	}(mutex)

	// 执行需要锁保护的操作 获取真实的 库存
	count, err := strconv.Atoi(rdb.Get(ctx, product_key).Val())
	if err != nil {
		fmt.Println("Error getting product count: %v", err)
		c.JSON(http.StatusOK, gin.H{
			"message": "Error getting product count",
		})
		return
	}
	if count > 1 {
		stock := count - 1
		err := rdb.Set(ctx, product_key, strconv.Itoa(stock), 0).Err()
		if err != nil {
			fmt.Println("Error setting product count: %v", err)
			c.JSON(http.StatusOK, gin.H{
				"message": "Error setting product count",
			})
			return
		} else {
			fmt.Println("减库存操作成功, 现在库存为: %v", stock)
			c.JSON(http.StatusGone, gin.H{
				"message": "Hello, World!",
			})
			return
		}
	} else {
		fmt.Println("库存为 0 ")
		c.JSON(http.StatusOK, gin.H{
			"message": "Hello, World!",
		})
		return
	}
}


 无超卖情况:

测试结果如下:
  • 一共请求 523,418
  • 并发 858
  • 成功销售 404,238 个商品  即返回值为 410的个数

如此看,不知道是我 单台机器性能跑满了测试不准确还是其他原因。并没有十倍的性能提升