🔄 Redis 与分布式事务:最终一致性的实践艺术
文章目录
🧠 一、分布式事务基础
💡 CAP 与 BASE 理论
在分布式系统中,一致性与可用性的平衡是核心挑战:
Redis 在分布式事务中的定位:
⚡ 高性能缓存:加速事务数据访问
📊 状态管理:存储事务状态和中间结果
🔄 消息队列:作为事务事件发布/订阅通道
⏰ 分布式锁:协调分布式资源访问
📊 分布式事务模式对比
模式 | 一致性级别 | 性能 | 复杂度 | 适用场景 |
---|---|---|---|---|
2PC | 强一致性 | 低 | 高 | 金融交易 |
TCC | 最终一致性 | 中 | 高 | 电商订单 |
Saga | 最终一致性 | 中 | 中 | 长事务 |
本地消息表 | 最终一致性 | 中 | 低 | 异步事务 |
最大努力通知 | 弱一致性 | 高 | 低 | 通知类业务 |
⚡ 二、TCC 事务模型详解
💡 TCC 三阶段流程
TCC(Try-Confirm-Cancel)是分布式事务的经典模式,通过业务补偿实现最终一致性:
🏗️ Redis 在 TCC 中的角色
Redis 作为 TCC 状态管理器:
public class TccStatusManager {
private static final String TCC_STATUS_PREFIX = "tcc:status:";
private static final String TCC_LOCK_PREFIX = "tcc:lock:";
// 记录Try阶段状态
public boolean recordTryStatus(String xid, String service, Map<String, Object> params) {
String key = TCC_STATUS_PREFIX + xid + ":" + service;
String lockKey = TCC_LOCK_PREFIX + xid;
// 使用分布式锁确保原子性
try (DistributedLock lock = acquireLock(lockKey)) {
if (lock.tryLock()) {
// 存储Try阶段数据
jedis.hset(key, "status", "try");
jedis.hset(key, "params", serialize(params));
jedis.expire(key, 3600); // 1小时超时
return true;
}
return false;
}
}
// 查询事务状态
public String getTransactionStatus(String xid) {
String pattern = TCC_STATUS_PREFIX + xid + ":*";
Set<String> keys = jedis.keys(pattern);
// 分析所有参与服务的状态
Map<String, String> statusMap = new HashMap<>();
for (String key : keys) {
String status = jedis.hget(key, "status");
String service = extractServiceName(key);
statusMap.put(service, status);
}
return determineOverallStatus(statusMap);
}
}
⚙️ TCC 超时与重试机制
基于 Redis 的重试队列:
public class TccRetryManager {
private static final String RETRY_QUEUE = "tcc:retry:queue";
private static final String RETRY_COUNT_PREFIX = "tcc:retry:count:";
// 添加重试任务
public void addRetryTask(String xid, String service, String operation) {
Map<String, String> task = new HashMap<>();
task.put("xid", xid);
task.put("service", service);
task.put("operation", operation);
task.put("timestamp", String.valueOf(System.currentTimeMillis()));
// 使用有序集合存储重试任务
jedis.zadd(RETRY_QUEUE, System.currentTimeMillis(), serialize(task));
// 初始化重试计数器
jedis.set(RETRY_COUNT_PREFIX + xid + ":" + service + ":" + operation, "0");
}
// 处理重试任务
public void processRetryTasks() {
while (true) {
Set<String> tasks = jedis.zrangeByScore(RETRY_QUEUE, 0, System.currentTimeMillis(), 0, 10);
for (String taskStr : tasks) {
Map<String, String> task = deserialize(taskStr);
if (shouldRetry(task)) {
executeRetry(task);
jedis.zrem(RETRY_QUEUE, taskStr);
}
}
sleep(1000); // 每秒检查一次
}
}
}
🔄 三、补偿机制与幂等性
💡 幂等性设计模式
基于 Redis 的幂等控制:
public class IdempotentController {
private static final String IDEMPOTENT_PREFIX = "idempotent:";
private static final int DEFAULT_EXPIRE = 86400; // 24小时
// 检查幂等键
public boolean checkIdempotent(String idempotentKey) {
String key = IDEMPOTENT_PREFIX + idempotentKey;
// 使用SETNX实现原子性检查
Long result = jedis.setnx(key, "1");
if (result == 1) {
jedis.expire(key, DEFAULT_EXPIRE);
return false; // 第一次请求
}
return true; // 重复请求
}
// 生成幂等键(业务标识+唯一ID)
public String generateIdempotentKey(String business, String uniqueId) {
return business + ":" + uniqueId;
}
}
🛡️ 补偿事务实现
基于 Redis 的补偿日志:
public class CompensationLogger {
private static final String COMPENSATION_LOG = "compensation:log";
// 记录补偿操作
public void logCompensation(String xid, String service, String operation,
Object params, String status) {
Map<String, String> logEntry = new HashMap<>();
logEntry.put("xid", xid);
logEntry.put("service", service);
logEntry.put("operation", operation);
logEntry.put("params", serialize(params));
logEntry.put("status", status);
logEntry.put("timestamp", String.valueOf(System.currentTimeMillis()));
// 使用Stream存储补偿日志
jedis.xadd(COMPENSATION_LOG, StreamEntryID.NEW_ENTRY, logEntry);
}
// 查询需要补偿的操作
public List<Map<String, String>> findCompensations(String xid) {
List<StreamEntry> entries = jedis.xrange(COMPENSATION_LOG, "-", "+");
return entries.stream()
.filter(entry -> xid.equals(entry.getFields().get("xid")))
.filter(entry -> "need_compensation".equals(entry.getFields().get("status")))
.map(StreamEntry::getFields)
.collect(Collectors.toList());
}
}
⚡ Lua 脚本保证原子性
原子性补偿操作:
-- 原子性检查并执行补偿
local function compensateOperation(key, expectedValue, compensationScript)
local currentValue = redis.call('GET', key)
if currentValue == expectedValue then
-- 执行补偿操作
redis.call('EVAL', compensationScript, 0)
return true
else
return false
end
end
-- 库存补偿示例
local stockCompensation = [[
redis.call('HINCRBY', KEYS[1], 'available_stock', ARGV[1])
redis.call('HINCRBY', KEYS[1], 'locked_stock', -ARGV[1])
]]
-- 执行补偿
compensateOperation('order:1234:status', 'cancelled', stockCompensation)
🚀 四、Redis 实战应用
🛒 电商订单-库存一致性
分布式事务场景:用户下单时,需要同时扣减库存和创建订单:
Redis 实现的关键代码:
public class OrderInventoryCoordinator {
private static final String ORDER_PREFIX = "order:";
private static final String INVENTORY_PREFIX = "inventory:";
// Try阶段:预扣库存
public boolean tryLockInventory(String productId, int quantity, String xid) {
String key = INVENTORY_PREFIX + productId;
String lockField = "locked_stock";
String availableField = "available_stock";
// 使用Lua脚本保证原子性
String script = "
local available = tonumber(redis.call('HGET', KEYS[1], ARGV[2]))
if available >= tonumber(ARGV[1]) then
redis.call('HSET', KEYS[1], ARGV[2], available - tonumber(ARGV[1]))
redis.call('HINCRBY', KEYS[1], ARGV[3], tonumber(ARGV[1]))
redis.call('HSET', KEYS[1], 'lock_xid:' .. ARGV[4], ARGV[1])
return 1
else
return 0
end
";
Object result = jedis.eval(script, 1, key,
String.valueOf(quantity), availableField, lockField, xid);
return Long.valueOf(1).equals(result);
}
// Confirm阶段:确认扣减
public boolean confirmInventory(String productId, String xid) {
String key = INVENTORY_PREFIX + productId;
String lockField = "locked_stock";
// 获取预锁数量
String lockedAmount = jedis.hget(key, "lock_xid:" + xid);
if (lockedAmount == null) {
return false;
}
// 清理锁定记录,完成扣减
jedis.hdel(key, "lock_xid:" + xid);
return true;
}
// Cancel阶段:释放库存
public boolean cancelInventory(String productId, String xid) {
String key = INVENTORY_PREFIX + productId;
String lockField = "locked_stock";
String availableField = "available_stock";
String script = "
local locked = redis.call('HGET', KEYS[1], 'lock_xid:' .. ARGV[2])
if locked then
redis.call('HINCRBY', KEYS[1], ARGV[3], tonumber(locked))
redis.call('HINCRBY', KEYS[1], ARGV[1], -tonumber(locked))
redis.call('HDEL', KEYS[1], 'lock_xid:' .. ARGV[2])
return 1
end
return 0
";
Object result = jedis.eval(script, 1, key, lockField, xid, availableField);
return Long.valueOf(1).equals(result);
}
}
🔄 Saga 模式实现
基于 Redis 的 Saga 状态机:
public class SagaStateMachine {
private static final String SAGA_STATE_PREFIX = "saga:state:";
private static final String SAGA_LOG_PREFIX = "saga:log:";
// 执行Saga事务
public void executeSaga(String xid, List<SagaStep> steps) {
// 记录初始状态
jedis.set(SAGA_STATE_PREFIX + xid, "started");
for (int i = 0; i < steps.size(); i++) {
SagaStep step = steps.get(i);
try {
// 执行正向操作
step.execute();
// 记录成功状态
jedis.hset(SAGA_LOG_PREFIX + xid, "step_" + i, "completed");
} catch (Exception e) {
// 执行补偿操作
compensate(xid, i);
jedis.set(SAGA_STATE_PREFIX + xid, "compensated");
throw e;
}
}
jedis.set(SAGA_STATE_PREFIX + xid, "completed");
}
// 补偿操作
private void compensate(String xid, int failedStep) {
for (int i = failedStep; i >= 0; i--) {
try {
// 执行补偿操作
SagaStep step = steps.get(i);
step.compensate();
jedis.hset(SAGA_LOG_PREFIX + xid, "compensate_step_" + i, "completed");
} catch (Exception e) {
// 记录补偿失败,需要人工干预
jedis.hset(SAGA_LOG_PREFIX + xid, "compensate_step_" + i, "failed");
throw new SagaCompensationException("Compensation failed at step " + i, e);
}
}
}
}
💡 五、总结与架构对比
📊 分布式事务方案对比
方案 | 一致性保证 | 性能 | 复杂度 | 适用场景 | Redis 作用 |
---|---|---|---|---|---|
Redis 事务 | 弱一致性 | 高 | 低 | 简单场景 | 核心事务机制 |
TCC 模式 | 最终一致性 | 中 | 高 | 电商业务 | 状态管理/协调 |
Saga 模式 | 最终一致性 | 中 | 中 | 长事务 | 状态机持久化 |
本地消息表 | 最终一致性 | 中 | 中 | 异步业务 | 消息存储 |
2PC/XA | 强一致性 | 低 | 高 | 金融业务 | 资源管理 |
🏗️ Redis 与 Seata 集成
Seata 结合 Redis 的优化方案:
public class SeataRedisIntegration {
// 使用Redis存储Seata事务状态
public void enhanceSeataWithRedis() {
// 1. 事务状态缓存
String globalTxKey = "seata:global:" + xid;
jedis.hset(globalTxKey, "status", status);
jedis.expire(globalTxKey, 3600);
// 2. 分支事务记录
String branchTxKey = "seata:branch:" + xid + ":" + branchId;
jedis.hset(branchTxKey, "status", branchStatus);
jedis.expire(branchTxKey, 3600);
// 3. 快速状态查询
public String getTransactionStatus(String xid) {
return jedis.hget("seata:global:" + xid, "status");
}
}
}
🚀 生产环境最佳实践
1. Redis 配置优化:
# redis.conf 事务相关配置
maxmemory 2gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec
# 连接池配置
maxTotal 100
maxIdle 50
minIdle 10
testOnBorrow true
2. 监控与告警:
# 监控事务相关指标
redis-cli info stats | grep rejected
redis-cli info memory | grep used_memory
redis-cli info persistence | grep rdb_last_save_time
# 设置告警规则
- 内存使用率 > 80%
- 键空间命中率 < 90%
- 持久化延迟 > 5秒
3. 灾备与恢复:
public class TransactionRecovery {
// 事务恢复机制
public void recoverPendingTransactions() {
// 1. 扫描未完成的事务
Set<String> pendingTx = jedis.keys("tcc:status:*");
for (String key : pendingTx) {
String status = jedis.hget(key, "status");
String xid = extractXidFromKey(key);
if ("try_success".equals(status)) {
// 2. 检查事务超时
long createTime = Long.parseLong(jedis.hget(key, "create_time"));
if (System.currentTimeMillis() - createTime > MAX_HOLD_TIME) {
// 3. 执行自动补偿
autoCompensate(xid);
}
}
}
}
}
🔮 未来发展趋势
1. Serverless 架构下的分布式事务:
// 基于函数计算的分布式事务
public class ServerlessTransaction {
// 使用Redis作为事务状态存储
public void handleTransactionEvent(String event) {
// 1. 记录事务状态
String statusKey = "serverless:tx:" + transactionId;
jedis.set(statusKey, "processing");
// 2. 执行业务逻辑
processBusinessLogic(event);
// 3. 更新状态
jedis.set(statusKey, "completed");
}
}
- 云原生集成:
- Kubernetes Operator 自动管理 Redis 集群
- 服务网格集成分布式事务
- 自动扩缩容应对流量高峰
3. 人工智能优化:
- 智能预测事务冲突
- 自动优化事务超时时间
- 动态调整重试策略