深入浅出分布式限流(更新中)

发布于:2025-07-11 ⋅ 阅读:(16) ⋅ 点赞:(0)

一、什么是分布式限流

分布式限流是一种在分布式系统架构下,对跨多个服务节点或实例的全局请求总量或速率进行统一协调和控制的机制。它的核心目标是防止整个系统被突发的高流量压垮,保障系统的稳定性、可用性和公平性。

二、为什么需要分布式限流?

  1. 单机限流的局限性:
    • 在分布式系统中,服务通常部署在多台服务器(节点)上。
    • 单机限流只在单个节点内部生效,只能控制到达该节点自身的流量。
    • 问题: 当流量通过负载均衡器分散到多个节点时:
      • 每个节点都允许自己的最大流量(例如 100 QPS),那么整个集群理论上可以承受 节点数 * 单节点限流值(例如 10 节点 * 100 = 1000 QPS)的流量。
      • 如果下游资源(如数据库、缓存、第三方接口)的承受能力低于这个总值(例如数据库只能承受 500 QPS),那么即使每个节点都没超限,下游资源也会被过载的全局总流量击垮。
      • 流量可能不均匀分配,某些节点可能先达到限流,而其他节点还很空闲,导致整体资源利用不充分且部分用户被错误限流。
  2. 分布式限流的必要性:
    • 保护共享资源: 确保对数据库、缓存、消息队列、外部 API 等共享资源的访问总量不超过其承受能力。
    • 全局视角: 从整个系统或服务的角度设定一个全局的流量阈值。
    • 公平性: 在多个客户端或多个入口点访问同一资源时,提供更公平的流量控制。
    • 应对突发流量: 有效平滑掉分布式场景下可能出现的全局突发流量洪峰。

举个例子:

想象一下:一个超级热门的网红餐厅(这就是你的整个系统)。

  1. 问题:
    • 餐厅很大,有很多张桌子(多台服务器/节点)。
    • 如果每个服务员(单个服务器)只管自己负责的几张桌子,只限制自己负责区域进来的客人数量(单机限流),比如每个服务员最多同时服务5桌。
    • 看起来没问题?但麻烦来了:餐厅的厨房(核心资源,比如数据库) 只有那么大,最多只能同时给20桌炒菜!
    • 如果一下子涌进来100个客人,被分配到不同的服务员区域。每个服务员都觉得:“我这才进来3桌/4桌客人,还没到我上限5桌呢,放行!” 结果,同时有30桌甚至40桌客人点了菜,都涌向厨房。
    • 后果: 厨房彻底崩溃了!厨师忙不过来,菜出不来,所有客人(所有请求)都卡住、等待,甚至餐厅(系统)直接瘫痪关门。

单机限流的痛点: 每个服务员(服务器)只顾自己眼前的一亩三分地,不知道整个餐厅(全局)的压力,尤其不知道厨房(共享资源)的极限在哪里。结果就是厨房被压垮。

分布式限流算法

漏桶算法
基本原理

漏桶算法是一种流量整形技术,核心思想是将请求看作"水滴",系统是一个底部有漏洞的桶:

  1. 水滴(请求)以任意速率进入桶中
  2. 桶底部的漏洞以恒定速率漏水(处理请求)
  3. 当桶满时(水量超过容量),新水滴会溢出(拒绝请求)

关键特性:

  • 输出速率恒定:无论输入多快,处理速率始终固定
  • 强制平滑流量:突发流量被整形为稳定输出
  • 队列管理:桶内水量相当于待处理请求队列
举例说明

假设设置漏桶:

  • 容量:10个请求
  • 漏水速率:2个请求/秒

场景:

  1. 0秒时:突发10个请求 → 桶满(接受10个)
  2. 第1秒:新到5个请求 → 桶已满(拒绝5个)
  3. 每秒漏出2个请求:
    • 第1秒末:桶剩8个(处理2个)
    • 第2秒末:桶剩6个
    • …直到第5秒末:桶空

结果:10个突发请求被平滑成5秒内处理完(每秒2个),超限请求被拒绝。

技术实现

分布式漏桶需要解决三个核心问题:

  1. 水位计算:实时跟踪桶内水量
  2. 漏水模拟:根据时间流逝减少水量
  3. 原子操作:并发请求下的数据一致性

实现方案:

代码示例(Java+Redis)
import redis.clients.jedis.Jedis;
import java.time.Instant;

public class DistributedLeakyBucket {

    private final String redisKey;
    private final long capacity;     // 桶容量
    private final long leakRate;     // 漏水速率(请求/秒)
    private final Jedis jedis;       // Redis连接

    public DistributedLeakyBucket(String redisKey, long capacity, long leakRate, Jedis jedis) {
        this.redisKey = redisKey;
        this.capacity = capacity;
        this.leakRate = leakRate;
        this.jedis = jedis;
    }

    // Lua脚本保证原子操作
    private static final String LUA_SCRIPT =
        "local key = KEYS[1] " +
        "local capacity = tonumber(ARGV[1]) " +
        "local leakRate = tonumber(ARGV[2]) " +
        "local now = tonumber(ARGV[3]) " +
        
        "local bucket = redis.call('hmget', key, 'waterLevel', 'lastLeakTime') " +
        "local waterLevel = tonumber(bucket[1]) or 0 " +
        "local lastLeakTime = tonumber(bucket[2]) or now " +
        
        // 计算漏水量:(当前时间 - 上次漏水时间) * 漏水速率
        "local elapsed = now - lastLeakTime " +
        "local leakedAmount = math.floor(elapsed * leakRate) " +
        
        // 更新水位
        "waterLevel = math.max(0, waterLevel - leakedAmount) " +
        "lastLeakTime = now " +
        
        // 检查桶容量
        "if waterLevel < capacity then " +
        "   waterLevel = waterLevel + 1 " +  // 允许请求
        "   redis.call('hmset', key, 'waterLevel', waterLevel, 'lastLeakTime', lastLeakTime) " +
        "   return 1 " +  // 返回成功
        "else " +
        "   redis.call('hmset', key, 'waterLevel', waterLevel, 'lastLeakTime', lastLeakTime) " +
        "   return 0 " +  // 返回失败
        "end";

    public boolean tryAcquire() {
        long now = Instant.now().getEpochSecond(); // 当前时间戳(秒)
        
        // 执行Lua脚本
        Object result = jedis.eval(
            LUA_SCRIPT,
            1, // KEY数量
            redisKey,
            String.valueOf(capacity),
            String.valueOf(leakRate),
            String.valueOf(now)
        );
        
        return ((Long) result) == 1L;
    }
}

使用示例

public static void main(String[] args) {
    // 创建Redis连接
    Jedis jedis = new Jedis("localhost", 6379);
    
    // 初始化漏桶:容量10,速率2请求/秒
    DistributedLeakyBucket bucket = new DistributedLeakyBucket(
        "api_bucket", 10, 2, jedis
    );

    // 模拟请求
    for (int i = 1; i <= 15; i++) {
        boolean allowed = bucket.tryAcquire();
        System.out.println("请求" + i + ": " + 
            (allowed ? "✓ 通过" : "✗ 拒绝"));
        
        try {
            Thread.sleep(300); // 模拟请求间隔
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果

请求1: ✓ 通过
请求2: ✓ 通过
...(前10个请求快速通过)
请求11: ✗ 拒绝
请求12: ✗ 拒绝
请求13: ✓ 通过  // 1秒后漏出2个空位
请求14: ✓ 通过
请求15: ✗ 拒绝
适用场景

漏桶算法最适合需要严格保护下游系统的场景:

  1. 数据库保护
    • 防止突发查询击穿数据库
    • 示例:capacity=100, leakRate=50/s 确保DB每秒最多50查询
  2. 第三方API调用
    • 遵守严格的速率限制
    • 示例:外部API限制100次/分钟 → leakRate=100/60≈1.66/s
  3. 支付系统
    • 避免银行接口被突发交易冲垮
    • 示例:支付网关设置leakRate=30/s平滑处理交易
  4. 日志处理系统
    • 防止日志洪水压垮存储
    • 示例:ES写入限速leakRate=500/s
  5. 硬件设备通信
    • 控制IoT设备指令下发速率
    • 示例:设备最大处理能力leakRate=200/s
令牌桶算法
基本原理

令牌桶算法是一种流量整形速率限制机制,其核心原理是:

  1. 系统以固定速率向桶中添加令牌(token)
  2. 桶有最大容量,令牌满时不再添加
  3. 每个请求需要获取一个令牌才能执行
  4. 请求到达时:
    • 桶中有足够令牌 → 立即执行并扣除令牌
    • 桶中令牌不足 → 请求被限流(等待或拒绝)
举例说明

假设某API限流规则:每秒最多处理10个请求(桶容量=10,添加速率=10个/秒):

  1. 突发流量:当15个请求同时到达时
    • 前10个请求立即获得令牌处理
    • 剩余5个请求被限流
  2. 后续请求:每0.1秒新增1个令牌
    • 新请求到达时若有令牌则处理
    • 无令牌则继续等待或被拒绝
技术实现

关键组件:

  1. 桶容量:允许的突发请求最大值
  2. 令牌添加速率:每秒添加的令牌数
  3. 当前令牌数:动态变化的可用令牌
  4. 最后更新时间:计算新增令牌

线程安全实现要点:

  • 使用同步机制(如synchronized
  • 精确计算时间差
  • 避免令牌超发
代码示例(Java实现)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;

public class TokenBucketRateLimiter {
    private final JedisPool jedisPool;
    private final String keyPrefix;
    private final int capacity;
    private final double rate;
    private String luaScriptSha;

    public TokenBucketRateLimiter(JedisPool jedisPool, String keyPrefix, 
                                  int capacity, double rate) {
        this.jedisPool = jedisPool;
        this.keyPrefix = keyPrefix;
        this.capacity = capacity;
        this.rate = rate;
        loadLuaScript();
    }

    private void loadLuaScript() {
        try (Jedis jedis = jedisPool.getResource()) {
            // 从文件加载Lua脚本
            String script = new String(
                Files.readAllBytes(Paths.get("token_bucket.lua")),
                StandardCharsets.UTF_8
            );
            luaScriptSha = jedis.scriptLoad(script);
        } catch (IOException e) {
            throw new RuntimeException("Failed to load Lua script", e);
        }
    }

    public boolean tryAcquire() {
        return tryAcquire(1);
    }

    public boolean tryAcquire(int requestedTokens) {
        try (Jedis jedis = jedisPool.getResource()) {
            String tokensKey = keyPrefix + ":tokens";
            String timestampKey = keyPrefix + ":ts";
            long now = Instant.now().getEpochSecond();

            // 执行Lua脚本
            Object result = jedis.evalsha(
                luaScriptSha,
                2,  // KEYS数量
                tokensKey,
                timestampKey,
                String.valueOf(capacity),
                String.valueOf(rate),
                String.valueOf(now),
                String.valueOf(requestedTokens)
            );

            // 解析结果 [是否允许, 剩余令牌]
            @SuppressWarnings("unchecked")
            java.util.List<Long> list = (java.util.List<Long>) result;
            return list.get(0) == 1;
        }
    }
}
local tokens_key = KEYS[1]        -- 令牌数量键
local timestamp_key = KEYS[2]     -- 最后刷新时间键

local capacity = tonumber(ARGV[1]) -- 桶容量
local rate = tonumber(ARGV[2])    -- 令牌生成速率(令牌/秒)
local now = tonumber(ARGV[3])     -- 当前时间戳(秒)
local requested = tonumber(ARGV[4]) -- 请求令牌数(默认1)

-- 获取当前桶状态
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

-- 计算时间差和应生成令牌数
local delta = math.max(0, now - last_refreshed)
local new_tokens = math.min(capacity, last_tokens + delta * rate)

-- 检查令牌是否足够
local allowed = new_tokens >= requested
local allowed_tokens = new_tokens

if allowed then
  allowed_tokens = new_tokens - requested
end

-- 更新存储 (仅当时间变化时更新时间戳)
redis.call("set", tokens_key, allowed_tokens)
if delta > 0 then
  redis.call("set", timestamp_key, now)
end

-- 返回结果: [是否允许, 剩余令牌数]
return { allowed and 1 or 0, allowed_tokens }

使用示例

public class RateLimiterExample {
    public static void main(String[] args) {
        JedisPool jedisPool = new JedisPool("localhost", 6379);
        String resourceKey = "api:user_profile"; // 限流资源标识
        
        // 创建限流器: 容量10令牌,每秒生成2令牌
        TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(
            jedisPool, resourceKey, 10, 2.0
        );

        // 模拟请求
        for (int i = 1; i <= 15; i++) {
            boolean allowed = limiter.tryAcquire();
            System.out.println("Request " + i + ": " + 
                (allowed ? "允许" : "被限流"));
            
            try {
                Thread.sleep(200); // 200毫秒请求间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        jedisPool.close();
    }
}
适用场景
  1. 网络流量控制:路由器/防火墙的带宽限制

  2. 资源分配系统:数据库连接池管理

  3. 突发流量处理:允许短时间内超过平均速率

    • 优势:相比固定窗口算法更适应流量峰值

    关键优势:允许突发流量 + 精确控制长期平均速率,比漏桶算法更灵活应对突发请求。

固定窗口算法
滑动窗口算法

网站公告

今日签到

点亮在社区的每一天
去签到