一、问题的产生:秒杀功能中的超卖现象
在开发秒杀功能时,最初的逻辑很简单:判断商品库存是否大于 0,若大于则扣减库存,否则秒杀失败。然而上线后,出现了库存只有 1 个,却卖出多份的超卖问题。
这是因为在多线程并发场景下,多个线程同时对共享的库存资源进行读写,会导致数据错乱。
private static int stock = 1; // 假设初始库存为1
public static void placeOrder() throws Exception {
if (stock > 0) {
Thread.sleep(100);
stock--;
System.out.println(Thread.currentThread().getName() + "秒杀成功");
} else {
System.out.println(Thread.currentThread().getName() + "秒杀失败!库存不足");
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
placeOrder();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
运行结果:
二、初步尝试:使用synchronized锁
为解决多线程并发问题,我们使用了 synchronized 同步锁对秒杀逻辑进行改造。改造后进行压测,超卖问题确实得到解决。
private static final Object lock = new Object();
private static int stock = 1;
public static void placeOrder() throws Exception {
synchronized (lock) {
if (stock > 0) {
Thread.sleep(100);
stock--;
System.out.println(Thread.currentThread().getName() + "秒杀成功");
} else {
System.out.println(Thread.currentThread().getName() + "秒杀失败!库存不足");
}
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
placeOrder();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
运行结果:
但随着用户量增长,服务器压力增大、性能达到瓶颈。于是我们采用 Nginx 负载均衡进行服务器水平扩展,构建分布式集群。可压测时发现,秒杀功能又出现超卖问题。
这是因为 synchronized是 JVM 级别的锁,只能锁住单个进程内的线程。在分布式部署后,每台服务器的 synchronized 锁只能控制自身服务器内的线程,无法跨服务器协调,多个服务器的线程仍会并发操作库存,导致超卖。
三、分布式锁的引入:Redis方案
为解决分布式场景下的并发问题,我们引入分布式锁,主流的分布式锁实现有 Redis 和 ZooKeeper,这里选择 Redis 来实现。
3.1 Redis分布式锁的核心原理(基于SETNX)
Redis 的 SETNX(Set If Not Exists)命令是实现分布式锁的关键。当一个线程向 Redis 中通过 SETNX 存储一个键值对时:
- 如果该键不存在,就存储成功并返回 True,表示获取到锁。
- 如果该键已存在,存储失败并返回 False,表示获取锁失败。
利用这个特性,我们可以让多个服务器上的线程,通过争抢 Redis 中的 “锁键”,来实现对秒杀资源的互斥访问。
3.2 Redis分布式锁的关键要点
1. 必须设置锁的过期时间
如果不设置过期时间,当持有锁的线程意外挂掉(如服务器宕机),锁会一直存在,其他线程会一直等待,陷入死锁。
2. 处理业务超时问题
若业务处理时间超过锁的过期时间,锁会自动释放,其他线程就会抢占锁,可能导致业务逻辑混乱。
解决方法有两种:
- 延长锁时间 + 心跳机制:加长锁的过期时间,并启动一个子线程,每 10 秒检查持有锁的线程是否在线,若在线则重置锁的过期时间。
- 给锁添加唯一标识:为每把锁设置唯一 ID(如 UUID),确保锁的 key 与持有它的线程绑定,防止线程释放其他线程的锁。
3.3 Redis的特性与red lock
Redis 采用 AP 模型,追求高可用和高性能,但不保证强一致性。
而 red lock 则致力于保证一致性,它要求所有参与的 Redis 节点(主从复制架构中,主节点和从节点都保存成功)都成功保存锁信息,才会返回加锁成功,以此提高分布式锁的可靠性。
四、Redis分布式锁在Java中的实现
在Java中使用Redis实现分布式锁有多种方式,下面我将介绍几种常见的实现方案及其代码示例。
方案一:基于SETNX命令的基础实现
1. 添加Redis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.3</version>
</dependency>
2. 基础实现代码
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;
import java.util.Collections;
public class RedisDistributedLock {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
private Jedis jedis;
public RedisDistributedLock(Jedis jedis) {
this.jedis = jedis;
}
/**
* 尝试获取分布式锁
* @param lockKey 锁的key
* @param requestId 请求标识(用于标识锁的持有者)
* @param expireTime 超期时间(毫秒)
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId, int expireTime) {
SetParams params = SetParams.setParams()
.nx() // NX: 仅当key不存在时设置
.px(expireTime); // PX: 设置过期时间(毫秒)
String result = jedis.set(lockKey, requestId, params);
return LOCK_SUCCESS.equals(result);
}
/**
* 释放分布式锁
* @param lockKey 锁的key
* @param requestId 请求标识
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}
/**
* 尝试获取锁(带重试机制)
*/
public boolean lockWithRetry(String lockKey, String requestId,
int expireTime, int retryTimes, long sleepMillis) {
for (int i = 0; i < retryTimes; i++) {
if (tryLock(lockKey, requestId, expireTime)) {
return true;
}
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
}
方案二:使用Redisson框架(推荐)
1. 添加Redisson依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.23.2</version>
</dependency>
2. Redisson实现代码
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
public class RedissonDistributedLock {
private RedissonClient redissonClient;
public RedissonDistributedLock() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
this.redissonClient = Redisson.create(config);
}
/**
* 获取锁
*/
public boolean tryLock(String lockKey, long waitTime, long leaseTime, TimeUnit unit) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 释放锁
*/
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
/**
* 关闭Redisson客户端
*/
public void shutdown() {
if (redissonClient != null) {
redissonClient.shutdown();
}
}
}
3. 使用示例
public class LockExample {
public static void main(String[] args) {
RedissonDistributedLock lockService = new RedissonDistributedLock();
String lockKey = "order:lock:1001";
try {
// 尝试获取锁,最多等待10秒,锁持有时间30秒
boolean acquired = lockService.tryLock(lockKey, 10, 30, TimeUnit.SECONDS);
if (acquired) {
try {
// 执行业务逻辑
processOrder();
} finally {
// 释放锁
lockService.unlock(lockKey);
}
} else {
System.out.println("获取锁失败");
}
} finally {
lockService.shutdown();
}
}
private static void processOrder() {
// 业务处理逻辑
System.out.println("处理订单业务...");
}
}
五、总结
从最初的单线程并发问题,到分布式场景下的并发控制,我们逐步探索出基于 Redis 的分布式锁方案来实现秒杀功能。Redis 分布式锁借助 SETNX 命令,结合过期时间、心跳机制等优化手段,能有效解决分布式秒杀中的超卖问题,同时在高可用、高性能方面也能满足秒杀场景的需求,当然 red lock 还能进一步提升锁的一致性保障。