Redis分布式锁的学习(八)

发布于:2025-07-26 ⋅ 阅读:(17) ⋅ 点赞:(0)

一、分布式锁

1.1、分布式锁是什么?

是一种在分布式系统中协调多个进程/服务对共享资源进行互斥访问的机制;确保在任意时刻,只有一个客户端可以访问资源。
在这里插入图片描述

1.2、为什么需要分布式锁?

  • 解决多个服务/进程对同共享资源竞争,比如双11,618购物节,多名用户对同一个商品下单,导致库存超卖问题。
  • 防止重复操作,比如用户连续点击导致重复下单

1.3、分布式锁需要具备的条件和刚需有哪些?

  • 独占性:任何时刻只能有且仅有一个线程持有
  • 高可用:
    • 若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
    • 高并发请求下,依旧性能好使
  • 防死锁:杜绝死锁,必须有超时控制机制或撤销操作,有个兜底终止跳出方案
  • 不乱抢:不能unlock别人的锁,只能自己加锁自己释放,自己的锁自己解
  • 重入性:同一个节点的同一个线程如果获取锁之后,它也可以再次获取这个锁

1.4、建立分布式锁

# 第一种
setnx key value
EXPIRE KEY 60

# 第二种
set key value [EX seconds][PX milliseconds][NX|XX]

使用setnx确保只有一个客户端能成功设置键,通过EXPIRE设置过期时间,防止死锁,但setnx + expire不安全,两条命令非原子性。可以通过 Lua脚本 来实现分布式锁。

-- 加锁
if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then
    redis.call('PEXPIRE', KEYS[1], ARGV[2])
    return 1
end

-- 释放锁
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

二、RedLock

官网地址:https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/

2.1、RedLock是什么?

ReadLock是一种算法,是一个更为规范的算法来使用Redis实现分布式锁,实现了比普通单实例方法更安全的DLM(分布式锁管理器)。

2.2、为什么使用RedLock?

在这里插入图片描述

在这里插入图片描述

导致问题:

    1. 主机宕机了,从机上位,变成新的master,用户依旧可以建锁成功,出现 一锁被多建多用 ,违法安全规定
    1. 多个线程获取到同一把锁,可能会导致各种预期之外的情况发生,比如脏数据

2.3、RedLock算法设计理念

在这里插入图片描述

大致方案如下:

假设有5个Redis主节点,不使用复制或任何其他隐式协调系统,为了获得锁客户端执行以下操作:

步骤 说明
1 获取当前时间,以毫秒为单位
2 依次尝试从5个实例,使用相同的key和随机值(如UUID)获取值,当向Redis请求获取值时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。这样可以防止客户端在试图与一个宕机的Redis节点对话时,长时间处于阻塞状态。如果一个实例不可用,客户端应该尽快尝试去另外一个Redis实例请求数据
3 客户端通过当前时间减去步骤1记录的时间 = 获取锁使用的时间。当且仅当从大多数(N/2+1)的Redis节点都渠道锁,并且获取锁使用的时间 < 锁失效时间时,锁才算获取成功
4 如果取到锁,其真正有效时间 = 初始有效时间 - 获取锁使用时间
5 如果由于某些原因未能获得锁(无法在至少N/2 + 1个Redis实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的Redis实例上进行解锁

注意:

客户端只有在满足以下两个条件时,才认为加锁成功

    1. 客户端从超过半数(>= N/2 + 1)的Redis实例上成功获取了锁
    1. 客户端获取锁的总耗时没有超过锁的有效时间

2.4、容错公式

N:最终部署机器数,X:容错机器数

N = 2X + 1

为什么是奇数?

从成本上来考虑,用最少的机器,达到最多的产出效果

三、实际操作

3.1、手写分布式锁

  • 注意点:
    • lock关键:
      • 加锁:在redis中,设置键,并设置过期时间
      • 自旋
      • 续期
    • unlock关键:不能unlock别人的锁,只能自己加锁自己释放,自己的锁自己解

3.2、利用redlock算法实现分布式锁

使用开源库redis-plus-plus: https://github.com/sewenew/redis-plus-plus

redlock代码路径:redis-plus-plus/src/sw/redis++/patterns/

类名 作用
RedLockUtils 工具类,提供ttl(计算时间差)和锁ID
RedMutexTx 基于Redis事务的分布式锁实现
RedMutex 作者推荐用户直接操作的分布式锁的类,根据配置选择使用脚本(RedLockMutex)或事务(RedMutexTx
RedLockMutex 针对单个Redis实例的分布式锁,使用脚本实现
RedLockMutexVessel 管理多个Redis实例的分布式锁,使用脚本实现
RedMutexOptions RedLock的配置选项,包括锁的TTL、重试延迟、是否使用脚本
RedMutexImpl 抽象基类
RedMutexImplTpl 模板类,继承自RedMutexImpl,根据模板参数(RedLockMutexRedMutexTx)实现具体操作
RedLock 是一个模板类,用于封装分布式锁的核心操作,根据模板参数(RedLockMutexRedMutexTx)实现具体操作
LockWatcher 用于监控锁的生命周期
/**     RedLock类       **/
/* 作者不是很推荐使用,更为推荐使用RedMutex,RedLock只是RedMutex的简单封装.
*/
template <typename RedisInstance>
class RedLock {
public:
    RedLock(RedisInstance &mut, std::defer_lock_t) : _mut(mut), _lock_val(RedLockUtils::lock_id()) {}

    ~RedLock() {
        if (owns_lock()) {
            unlock();
        }
    }

    // Try to acquire the lock for *ttl* milliseconds.
    // Returns how much time still left for the lock, i.e. lock validity time.
    bool try_lock(const std::chrono::milliseconds &ttl) {
        auto time_left = _mut.try_lock(_lock_val, ttl);
        if (time_left <= std::chrono::milliseconds(0)) {
            return false;
        }

        _release_tp = std::chrono::steady_clock::now() + time_left;

        return true;
    }

   ...
   ...
    void unlock() {
        try {
            _mut.unlock(_lock_val);
            _release_tp = std::chrono::time_point<std::chrono::steady_clock>{};
        } catch (const Error &) {
            _release_tp = std::chrono::time_point<std::chrono::steady_clock>{};
            throw;
        }
    }

    bool owns_lock() const {
        if (ttl() <= std::chrono::milliseconds(0)) {
            return false;
        }

        return true;
    }

    std::chrono::milliseconds ttl() const {
        auto t = std::chrono::steady_clock::now();
        return std::chrono::duration_cast<std::chrono::milliseconds>(_release_tp - t);
    }

private:
    RedisInstance &_mut;

    std::string _lock_val;

    // The time point that we must release the lock.
    std::chrono::time_point<std::chrono::steady_clock> _release_tp{};
};

/**         使用方法        **/
// 使用Lua脚本版本
{
    RedLockMutex mtx({redis1, redis2, redis3}, "resource");

    RedLock<RedLockMutex> lock(mtx, std::defer_lock);

   
    auto validity_time = lock.try_lock(std::chrono::seconds(30));

    validity_time = lock.extend_lock(std::chrono::seconds(10));

    lock.unlock();
} 

// Redis事务版本
{
    RedMutex mtx({redis1, redis2, redis3}, "resource");

    RedLock<RedMutex> lock(mtx, std::defer_lock);
    auto validity_time = lock.try_lock(std::chrono::seconds(30));
    validity_time = lock.extend_lock(std::chrono::seconds(30));

    lock.unlock();
}
/**     RedMutex类      **/
class RedMutex {
public:
    RedMutex(std::shared_ptr<Redis> master,
            const std::string &resource,
            std::function<void (std::exception_ptr)> auto_extend_err_callback = nullptr,
            const RedMutexOptions &opts = {},
            const std::shared_ptr<LockWatcher> &watcher = nullptr) :
        RedMutex(std::initializer_list<std::shared_ptr<Redis>>{master},
                resource, std::move(auto_extend_err_callback), opts, watcher) {}

    RedMutex(std::initializer_list<std::shared_ptr<Redis>> masters,
            const std::string &resource,
            std::function<void (std::exception_ptr)> auto_extend_err_callback = nullptr,
            const RedMutexOptions &opts = {},
            const std::shared_ptr<LockWatcher> &watcher = nullptr) :
        RedMutex(masters.begin(), masters.end(),
                resource, std::move(auto_extend_err_callback), opts, watcher) {}

    template <typename Input>
    RedMutex(Input first, Input last,
            const std::string &resource,
            std::function<void (std::exception_ptr)> auto_extend_err_callback = nullptr,
            const RedMutexOptions &opts = {},
            const std::shared_ptr<LockWatcher> &watcher = nullptr) {
        if (opts.scripting) {      //根据配置选项,选择脚本还是事务实现
            _mtx = std::make_shared<RedMutexImplTpl<RedLockMutex>>(first, last, resource,
                    std::move(auto_extend_err_callback), opts, watcher);
        } else {
            _mtx = std::make_shared<RedMutexImplTpl<RedMutexTx>>(first, last, resource,
                    std::move(auto_extend_err_callback), opts, watcher);
        }
    }

    ...

private:
    std::shared_ptr<RedMutexImpl> _mtx;
};

/**         使用方法        **/
auto redis = std::make_shared<Redis>("tcp://127.0.0.1");

auto redis1 = std::make_shared<Redis>("tcp://127.0.0.1:7000");
auto redis2 = std::make_shared<Redis>("tcp://127.0.0.1:7001");
auto redis3 = std::make_shared<Redis>("tcp://127.0.0.1:7002");

try {
    {
        // 单个实例
        RedMutex mtx(redis, "resource");
        std::lock_guard<RedMutex> lock(mtx);
    }

    {
        // 多个实例
        RedMutex mtx({redis1, redis2, redis3}, "resource");
        std::lock_guard<RedMutex> lock(mtx);
    }

    {
        RedMutexOptions opts;
        opts.ttl = std::chrono::seconds(5);

        auto watcher = std::make_shared<LockWatcher>();

        RedMutex mtx({redis1, redis2, redis3}, "resource",
                [](std::exception_ptr err) {
                    try {
                        std::rethrow_exception(err);
                    } catch (const Error &e) {
                    }
                },
                opts, watcher);

        std::unique_lock<RedMutex> lock(mtx, std::defer_lock);

        lock.lock();

        lock.unlock();

        lock.try_lock();
    }
} catch (const Error &err) {
}

先创建多个Redis独立的实例
先开启多个redis

创建多线程模拟多个客户端并发访问,模拟多个客户端并发操作。

/**     创建多线程模拟多个客户端并发访问      **/
std::unique_lock<RedMutex> lock(mtx, std::defer_lock);
auto client_thread = [&](string client_id, int task_count){
            try{
                    // 尝试获取锁(非阻塞版本)    
                    // if (!lock.try_lock()) {
                    //     cerr << "[" << client_id << "] Failed to acquire lock - operation skipped" << endl;
                    //     return;
                    // }

                    lock.lock();
                    cout << "[" << client_id << "] acquired lock\n";

                    // 模拟临界区操作
                    for (int i = 1; i <= task_count; ++i) {
                        process_order(client_id, i);
                    }

                    // 手动释放锁(析构时也会自动释放)
                    lock.unlock();
                    cout << "[" << client_id << "] released lock\n";
                
                    // 随机延迟后执行下一个任务
                    dis.param(uniform_int_distribution<>::param_type(50, 300));
                    this_thread::sleep_for(chrono::milliseconds(dis(gen)));
            }catch(const exception& e) {
                cerr << "[" << client_id << "] Error: " << e.what() << endl;
            }catch(const Error& e) {
                cerr << "Redis error: " << e.what() << endl;
            }
        };

        // 4. 创建多线程模拟多个客户端并发访问
    vector<thread> clients;
    vector<string> client_ids = {"ClientA", "ClientB", "ClientC"};
    for (const auto& id : client_ids) {
        clients.emplace_back(client_thread, id, 5);
    }

注意: try_lock() 非阻塞版本,lock() 阻塞版本, 尽量别混合使用,否则会死锁。
在这里插入图片描述


正常运行结果:

在这里插入图片描述

四、总结

4.1、什么是分布式锁?

是一种在分布式系统中协调多个进程/服务对共享资源进行互斥访问的机制;确保在任意时刻,只有一个客户端可以访问资源。

4.2、分布式锁和常见的锁有什么区别?

锁类型 作用范围 实现方式 性能特点 典型应用场景
分布式锁 跨进程,跨机器 外部存储系统(如Redis、Zookeeper等) 网络开销大 多服务共享资源访问
互斥锁 单进程内-单机锁 原子操作 低延迟,高效率 多线程共享内存访问
读写锁 单进程内-单机锁 计数器+条件变量 读操作并发性好 读多写少的共享数据
自旋锁 单进程内-单机锁 CPU忙等待循环 低延迟,但浪费CPU 短时间占用资源场景

4.3、RedLock分布式锁的数据存储与高可用性分析

    1. RedLock算法,将每个节点看作是独立的,即没有主从关系,每个节点都可以独立地获取和释放锁,并且它们之间没有任何数据同步。
    1. RedLock算法本身不存储业务数据,它只负责管理分布式锁的状态。
    1. 部分节点宕机,只要满足 N/2+1 节点可用,锁服务仍然正常;宕机节点上的锁会在TTL过期后自动清理。

4.4、RedLock适用场景

个人认为,它不适用于需要强一致性的场景,只是在某些时间段,并发量突发时,避免超卖这类问题,比如双11,618等活动,常规时间段还是还原成集群部署模式,使用单机锁,保证数据的一致性。

Code
0vice·GitHub


网站公告

今日签到

点亮在社区的每一天
去签到