基于Redis分布锁+事务补偿解决数据不一致性问题

发布于:2025-03-23 ⋅ 阅读:(28) ⋅ 点赞:(0)

基于Redis的分布式设备库存服务设计与实现

概述

本文介绍一个基于Redis实现的分布式设备库存服务方案,通过分布式锁、重试机制和事务补偿等关键技术,保证在并发场景下库存操作的原子性和一致性。该方案适用于物联网设备管理、分布式资源调度等场景。

代码实现


import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.SetParams;


// 模拟设备库存服务
public class DeviceInventoryService {
    private static final Logger logger = LoggerFactory.getLogger(DeviceInventoryService.class);
    private final Map<String, Integer> inventoryMap = new HashMap<>();
    private static final int MAX_RETRIES = 3;
    private static final int LOCK_EXPIRE_TIME = 10; // 锁的过期时间,单位:秒
    private final Jedis jedis;

    public DeviceInventoryService(Jedis jedis) {
        this.jedis = jedis;
    }

    // 初始化库存
    public void initializeInventory(String deviceId, int quantity) {
        inventoryMap.put(deviceId, quantity);
        logger.info("设备 {} 初始化库存为 {}", deviceId, quantity);
    }

    // 尝试获取分布式锁
    private boolean tryLock(String lockKey) {
        SetParams setParams = SetParams.setParams().nx().ex(LOCK_EXPIRE_TIME);
        String result = jedis.set(lockKey, "locked", setParams);
        return "OK".equals(result);
    }

    // 释放分布式锁
    private void releaseLock(String lockKey) {
        jedis.del(lockKey);
    }

    // 定时更新库存
    public boolean updateInventory(String deviceId, int updateQuantity) {
        String lockKey = "inventory_lock:" + deviceId;
        int retries = 0;
        //重试次数
        while (retries < MAX_RETRIES) {
            if (tryLock(lockKey)) {
                try {
                    return doUpdateInventory(deviceId, updateQuantity);
                } catch (Exception e) {
                    logger.error("设备 {} 库存更新失败,重试第 {} 次", deviceId, retries + 1, e);
                } finally {
                    releaseLock(lockKey);
                }
            }
            retries++;
            try {
                Thread.sleep(100); // 等待一段时间后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        logger.error("设备 {} 库存更新失败,达到最大重试次数", deviceId);
        return false;
    }

    // 实际执行库存更新操作
    private boolean doUpdateInventory(String deviceId, int updateQuantity) {
        int oldQuantity = inventoryMap.getOrDefault(deviceId, 0);
        try {
            // 记录操作日志
            logger.info("设备 {} 开始更新库存,更新前库存: {}", deviceId, oldQuantity);

            // 模拟更新操作
            int newQuantity = oldQuantity + updateQuantity;
            if (newQuantity < 0) {
                throw new IllegalArgumentException("库存不能为负数");
            }
            inventoryMap.put(deviceId, newQuantity);
            logger.info("设备 {} 库存更新成功,当前库存: {}", deviceId, newQuantity);
            return true;
        } catch (Exception e) {
            logger.error("设备 {} 库存更新失败: {}", deviceId, e.getMessage());
            // 进行事务补偿
            compensateInventory(deviceId, oldQuantity);
            return false;
        }
    }

    // 事务补偿
    private void compensateInventory(String deviceId, int oldQuantity) {
        inventoryMap.put(deviceId, oldQuantity);
        logger.info("设备 {} 库存已恢复到更新前的状态,当前库存: {}", deviceId, oldQuantity);
    }


    // 模拟定时任务
    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            DeviceInventoryService service = new DeviceInventoryService(jedis);
            service.initializeInventory("device001", 10);

            // 模拟定时更新库存
            service.updateInventory("device001", 5);
            service.updateInventory("device001", -20); // 模拟更新失败
        }
    }

}

核心设计

分布式锁机制

private boolean tryLock(String lockKey) {
        SetParams setParams = SetParams.setParams().nx().ex(LOCK_EXPIRE_TIME);
        String result = jedis.set(lockKey, "locked", setParams);
        return "OK".equals(result);
}
  • 使用Redis的set nx ex命令实现原子性加锁
  • 将锁的颗粒度设置到了设备上(根据实际业务设置)
  • 设置10秒过期时间,防止死锁(根据实际业务设置过期时间)

重试机制

		int retries = 0;
        //重试次数
        while (retries < MAX_RETRIES) {
            if (tryLock(lockKey)) {
                try {
                    return doUpdateInventory(deviceId, updateQuantity);
                } catch (Exception e) {
                    logger.error("设备 {} 库存更新失败,重试第 {} 次", deviceId, retries + 1, e);
                } finally {
                    releaseLock(lockKey);
                }
            }
            retries++;
            try {
                Thread.sleep(100); // 等待一段时间后重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
  • 最大重试次数三次(MAX_RETRIES)
  • 如果没有获取到锁则等待重试,超过重试次数则终止

补偿机制

private void compensateInventory(String deviceId, int oldQuantity) {
        inventoryMap.put(deviceId, oldQuantity);
        logger.info("设备 {} 库存已恢复到更新前的状态,当前库存: {}", deviceId, oldQuantity);
}
  • 在doUpdateInventory捕获异常后自动回滚
  • 基于版本号/快照的恢复机制
  • 保证最终数据一致性

关键代码解析

public boolean updateInventory(String deviceId, int updateQuantity) {
    String lockKey = "inventory_lock:" + deviceId;
    int retries = 0;
    
    while (retries < MAX_RETRIES) {
        if (tryLock(lockKey)) {
            try {
                return doUpdateInventory(deviceId, updateQuantity);
            } finally {
                releaseLock(lockKey);
            }
        }
        // ...重试逻辑...
    }
    return false;
}
  • 获取设备级别的分布式锁
  • 执行库存更新操作
  • 无论成功失败都释放锁(finally保证)
  • 达到重试上限后返回失败

核心操作方法

private boolean doUpdateInventory(String deviceId, int updateQuantity) {
    int oldQuantity = inventoryMap.getOrDefault(deviceId, 0);
    int newQuantity = oldQuantity + updateQuantity;
    
    if (newQuantity < 0) {
        throw new IllegalArgumentException("库存不能为负数");
    }
    inventoryMap.put(deviceId, newQuantity);
    return true;
}
  • 前置校验:库存不能为负数
  • 原子性操作:库存增减计算
  • 事务性更新:先计算后写入

使用示例

初始化与测试

public static void main(String[] args) {
    try (Jedis jedis = new Jedis("localhost", 6379)) {
        DeviceInventoryService service = new DeviceInventoryService(jedis);
        service.initializeInventory("device001", 10);

        service.updateInventory("device001", 5);  // 成功:库存15
        service.updateInventory("device001", -20); // 失败:触发补偿
    }
}

预期输出

INFO - 设备 device001 初始化库存为 10
INFO - 设备 device001 开始更新库存,更新前库存: 10
INFO - 设备 device001 库存更新成功,当前库存: 15
INFO - 设备 device001 开始更新库存,更新前库存: 15
ERROR - 设备 device001 库存更新失败: 库存不能为负数
INFO - 设备 device001 库存已恢复到更新前的状态,当前库存: 15

扩展思考

优化方向

  1. Redis集群支持:当前为单节点Redis,可升级为Redis Cluster
  2. 锁续期机制:添加看门狗线程自动续期锁
  3. 库存持久化:结合数据库实现库存持久化存储
  4. 监控体系:添加Prometheus监控指标

注意事项

  1. 网络分区场景下可能出现锁状态不一致
  2. 库存更新操作应保持幂等性
  3. Redis连接需要配置合理的超时参数
  4. 生产环境建议使用Lua脚本保证原子性

通过本文实现的库存服务,在保证线程安全的基础上,能够有效应对分布式环境下的资源竞争问题。实际部署时建议结合具体业务场景进行压力测试和参数调优。