架构思维:重温限流算法原理与实战

发布于:2025-09-08 ⋅ 阅读:(23) ⋅ 点赞:(0)

在这里插入图片描述

引言:限流——系统稳定的守护神

在当今高并发、高流量的互联网应用中,限流已成为保障系统稳定性的核心手段之一。无论是双11购物狂欢节、明星公布恋情的微博爆炸,还是突发热点事件,系统都可能面临远超设计容量的流量冲击。如何在流量洪峰中保持系统可用性,防止雪崩效应,是每个架构师必须面对的挑战。

本文将深入剖析限流的四大核心算法:计数器、滑动窗口、漏桶和令牌桶,不仅讲解理论原理,更提供完整的代码实现和实战经验,助你打造坚如磐石的高可用系统。


一、为什么要限流?

1.1 限流的本质

限流(Rate Limiting)是在保证系统基本可用的前提下,限制进入系统的请求量,防止系统被突发流量冲垮。其核心思想是:在系统容量范围内,尽可能多地处理请求,超出部分则进行排队或拒绝

1.2 限流的典型场景

  • 秒杀抢购:短时间内涌入远超系统处理能力的请求
  • 热点事件:明星公布恋情等突发热点导致流量激增
  • 系统保护:防止自身系统或下游系统被过载请求压垮
  • 资源分配:公平分配系统资源,防止个别用户/服务占用过多资源

1.3 一个真实案例

以微博为例:某明星公布恋情,访问量从平时的50万骤增至500万,而系统设计容量仅支持200万访问。若不进行限流,服务器将不堪重负而崩溃,导致所有用户无法访问。通过合理限流,系统可保持基本可用状态,仅部分用户会收到友好提示。


二、限流算法全景图

限流算法主要分为四类:计数器算法滑动窗口算法漏桶算法令牌桶算法。每种算法各有特点,适用于不同场景。


三、计数器算法:简单直接的限流方案

3.1 算法原理

计数器算法是在一个时间窗口内(如1分钟)设置一个固定的请求计数上限,当请求量超过上限时,后续请求将被拒绝。

核心特点

  • 时间窗口固定
  • 请求计数达到上限后拒绝后续请求
  • 时间窗口结束后重置计数器

计数器算法原理图

3.2 代码实现

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CounterLimiter {
    // 起始时间
    private static long startTime = System.currentTimeMillis();
    // 时间窗口大小(毫秒)
    private static long interval = 1000;
    // 每秒最大请求数
    private static long maxCount = 2;
    // 请求计数器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * 尝试获取请求许可
     * @param taskId 任务ID(用于日志追踪)
     * @param turn 当前轮次
     * @return >0 表示允许通过的请求数,<=0 表示被拒绝的请求数
     */
    private static long tryAcquire(long taskId, int turn) {
        long nowTime = System.currentTimeMillis();
        // 在当前时间窗口内
        if (nowTime < startTime + interval) {
            long count = accumulator.incrementAndGet();
            if (count <= maxCount) {
                return count;
            } else {
                return -count;
            }
        } else {
            // 时间窗口已过期,重置计数器
            synchronized (CounterLimiter.class) {
                log.info("新时间窗口开始, taskId={}, turn={}", taskId, turn);
                // 再次检查,防止重复初始化
                if (nowTime > startTime + interval) {
                    accumulator.set(0);
                    startTime = nowTime;
                }
            }
            return 0;
        }
    }

    public static void main(String[] args) {
        // 被限制的请求数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程执行轮数
        final int turns = 20;
        // 线程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        
        ExecutorService pool = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < threads; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < turns; j++) {
                        long taskId = Thread.currentThread().getId();
                        long index = tryAcquire(taskId, j);
                        if (index <= 0) {
                            // 被限制的请求数增加
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        float time = (System.currentTimeMillis() - start) / 1000F;
        // 输出统计结果
        log.info("限制的次数为:{},通过的次数为:{}", 
                limited.get(), (threads * turns - limited.get()));
        log.info("限制的比例为:{}", (float) limited.get() / (float) (threads * turns));
        log.info("运行的时长为:{}秒", time);
    }
}

3.3 临界问题:致命缺陷

计数器算法存在严重的临界问题:当请求集中在时间窗口边界时,可能导致实际流量超出限制。

计数器临界问题

如图所示:用户在0:59秒发送100个请求,1:00秒又发送100个请求,实际在1秒内发送了200个请求,而系统限制为1分钟100个请求(约每秒1.7个)。这种边界情况可能导致系统被突发流量冲垮。


四、滑动窗口算法:弥补计数器缺陷的进阶方案

4.1 为什么需要滑动窗口?

在前文中,我们已经了解到计数器算法存在严重的临界问题:当请求集中在时间窗口边界时,可能导致实际流量超出限制。虽然漏桶令牌桶算法能够解决这个问题,但它们实现相对复杂,且各有局限。

滑动窗口算法作为计数器算法的改进版,既保留了计数器算法的简单性,又解决了临界问题,是限流算法中一个非常实用的中间选择。

4.2 算法原理

滑动窗口算法将一个大时间窗口(如1分钟)划分为多个小时间窗口(如60个1秒的小窗口),每个小窗口独立计数。当时间窗口滑动时,只移除最旧的小窗口计数,加入新的小窗口计数,从而实现更精确的限流控制。

核心特点

  • 将大时间窗口划分为N个小窗口
  • 每个小窗口独立计数
  • 时间窗口滑动时,移除最旧小窗口计数,加入新小窗口
  • 当前窗口总请求数 = 所有小窗口请求数之和

4.3 滑动窗口 vs 固定窗口

特性 固定窗口计数器 滑动窗口
实现复杂度 简单 中等
临界问题 存在严重临界问题 解决了临界问题
流量分布 可能导致流量不均匀 流量分布更均匀
资源消耗 中(需要维护多个小窗口)
突发流量处理 不能处理边界突发流量 能更好地处理突发流量

4.4 代码实现

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;

@Slf4j
public class SlidingWindowLimiter {
    // 时间窗口总大小(毫秒)
    private final long windowSize;
    // 小窗口数量
    private final int subWindowCount;
    // 每个小窗口的大小(毫秒)
    private final long subWindowSize;
    // 每个窗口的最大请求数
    private final long maxRequests;
    // 小窗口数组,每个元素代表一个小窗口的请求数
    private final AtomicLongArray subWindowRequests;
    // 当前窗口的起始时间
    private final AtomicLong startTime;
    // 用于线程安全的锁
    private final Lock lock = new ReentrantLock();

    /**
     * 构造函数
     * @param windowSize 时间窗口总大小(毫秒)
     * @param subWindowCount 小窗口数量
     * @param maxRequests 每个窗口的最大请求数
     */
    public SlidingWindowLimiter(long windowSize, int subWindowCount, long maxRequests) {
        this.windowSize = windowSize;
        this.subWindowCount = subWindowCount;
        this.subWindowSize = windowSize / subWindowCount;
        this.maxRequests = maxRequests;
        this.subWindowRequests = new AtomicLongArray(subWindowCount);
        this.startTime = new AtomicLong(System.currentTimeMillis());
    }

    /**
     * 尝试获取请求许可
     * @return true 表示被限流,false 表示允许通过
     */
    public boolean tryAcquire() {
        long currentTime = System.currentTimeMillis();
        long currentWindowStart = currentTime - windowSize;
        
        try {
            lock.lock();
            // 检查是否需要滑动窗口
            if (currentTime - startTime.get() > windowSize) {
                // 计算需要滑动多少个小窗口
                int slideCount = (int) ((currentTime - startTime.get()) / subWindowSize);
                slideCount = Math.min(slideCount, subWindowCount);
                
                // 滑动窗口:清除最旧的小窗口数据
                for (int i = 0; i < slideCount; i++) {
                    int index = (i + subWindowCount) % subWindowCount;
                    subWindowRequests.set(index, 0);
                }
                
                // 更新窗口起始时间
                startTime.addAndGet(slideCount * subWindowSize);
            }
            
            // 计算当前请求所属的小窗口索引
            int index = (int) ((currentTime - startTime.get()) / subWindowSize) % subWindowCount;
            // 增加当前小窗口的请求数
            subWindowRequests.incrementAndGet(index);
            
            // 计算当前窗口的总请求数
            long totalRequests = 0;
            for (int i = 0; i < subWindowCount; i++) {
                totalRequests += subWindowRequests.get(i);
            }
            
            // 检查是否超过限制
            if (totalRequests > maxRequests) {
                // 超过限制,回滚当前请求的计数
                subWindowRequests.decrementAndGet(index);
                return true;
            }
            
            return false;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 获取当前窗口的总请求数
     */
    public long getCurrentRequests() {
        long totalRequests = 0;
        for (int i = 0; i < subWindowCount; i++) {
            totalRequests += subWindowRequests.get(i);
        }
        return totalRequests;
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建滑动窗口限流器:1秒窗口,10个小窗口,每秒最多10个请求
        SlidingWindowLimiter limiter = new SlidingWindowLimiter(1000, 10, 10);
        
        // 被限制的请求数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程执行轮数
        final int turns = 20;
        // 线程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        
        ExecutorService pool = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < threads; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < turns; j++) {
                        boolean intercepted = limiter.tryAcquire();
                        if (intercepted) {
                            limited.getAndIncrement();
                        }
                        Thread.sleep(100); // 模拟100ms的请求间隔
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        
        countDownLatch.await();
        float time = (System.currentTimeMillis() - start) / 1000F;
        
        // 输出统计结果
        log.info("限制的次数为:{},通过的次数为:{}", 
                limited.get(), (threads * turns - limited.get()));
        log.info("限制的比例为:{}", (float) limited.get() / (float) (threads * turns));
        log.info("运行的时长为:{}秒", time);
        log.info("当前窗口总请求数:{}", limiter.getCurrentRequests());
    }
}

4.5 滑动窗口的变种:加权滑动窗口

在某些场景下,我们希望最近的请求具有更高的权重,可以采用加权滑动窗口算法:

/**
 * 加权滑动窗口限流器
 * 最近的窗口权重更高,能更快响应流量变化
 */
@Slf4j
public class WeightedSlidingWindowLimiter {
    private final long windowSize;
    private final int subWindowCount;
    private final long subWindowSize;
    private final long maxRequests;
    private final AtomicLongArray subWindowRequests;
    private final AtomicLong startTime;
    // 权重数组,最近的窗口权重更高
    private final double[] weights;
    
    public WeightedSlidingWindowLimiter(long windowSize, int subWindowCount, long maxRequests) {
        this.windowSize = windowSize;
        this.subWindowCount = subWindowCount;
        this.subWindowSize = windowSize / subWindowCount;
        this.maxRequests = maxRequests;
        this.subWindowRequests = new AtomicLongArray(subWindowCount);
        this.startTime = new AtomicLong(System.currentTimeMillis());
        
        // 初始化权重,最近的窗口权重更高
        this.weights = new double[subWindowCount];
        double totalWeight = 0;
        for (int i = 0; i < subWindowCount; i++) {
            weights[i] = i + 1; // 越近的窗口权重越高
            totalWeight += weights[i];
        }
        // 归一化权重
        for (int i = 0; i < subWindowCount; i++) {
            weights[i] /= totalWeight;
        }
    }

    public boolean tryAcquire() {
        long currentTime = System.currentTimeMillis();
        long currentWindowStart = currentTime - windowSize;
        
        try {
            lock.lock();
            // 检查是否需要滑动窗口
            if (currentTime - startTime.get() > windowSize) {
                int slideCount = (int) ((currentTime - startTime.get()) / subWindowSize);
                slideCount = Math.min(slideCount, subWindowCount);
                
                for (int i = 0; i < slideCount; i++) {
                    int index = (i + subWindowCount) % subWindowCount;
                    subWindowRequests.set(index, 0);
                }
                
                startTime.addAndGet(slideCount * subWindowSize);
            }
            
            int index = (int) ((currentTime - startTime.get()) / subWindowSize) % subWindowCount;
            subWindowRequests.incrementAndGet(index);
            
            // 计算加权总请求数
            double weightedRequests = 0;
            for (int i = 0; i < subWindowCount; i++) {
                int actualIndex = (index - i + subWindowCount) % subWindowCount;
                weightedRequests += subWindowRequests.get(actualIndex) * weights[i];
            }
            
            // 检查是否超过限制
            if (weightedRequests > maxRequests) {
                subWindowRequests.decrementAndGet(index);
                return true;
            }
            
            return false;
        } finally {
            lock.unlock();
        }
    }
}

4.6 滑动窗口的优缺点

优点

  1. 解决临界问题:相比固定窗口计数器,能有效避免边界突发流量问题
  2. 实现相对简单:比漏桶和令牌桶算法实现更简单直观
  3. 灵活性高:可通过调整小窗口数量平衡精确度和资源消耗
  4. 流量分布更均匀:限流效果更平滑,避免流量突变

缺点

  1. 资源消耗略高:需要维护多个小窗口的计数器
  2. 无法应对突发流量:与漏桶算法类似,不能像令牌桶那样处理突发流量
  3. 精确度有限:小窗口数量越多,精确度越高,但资源消耗也越大

4.7 滑动窗口在实际系统中的应用

4.7.1 Sentinel中的滑动窗口实现

阿里巴巴开源的Sentinel流量控制组件就采用了滑动窗口实现:

// Sentinel中的滑动窗口核心实现
public class SlidingWindowLeapArray extends LeapArray<WindowWrap<MetricBucket>> {
    // 每个窗口的统计数据
    private final AtomicReferenceArray<WindowWrap<MetricBucket>> array;
    
    // 获取当前窗口
    public WindowWrap<MetricBucket> currentWindow(long time) {
        // 计算当前窗口的索引
        int idx = calculateTimeIdx(time);
        // 获取当前窗口
        WindowWrap<MetricBucket> old = array.get(idx);
        
        // 如果窗口不存在或已过期,创建新窗口
        if (old == null || time - old.windowStart() >= windowLengthInMs) {
            WindowWrap<MetricBucket> window = new WindowWrap<>(windowLengthInMs, time - time % windowLengthInMs, new MetricBucket());
            if (array.compareAndSet(idx, old, window)) {
                return window;
            }
        }
        
        return array.get(idx);
    }
}

4.7.2 Redis实现分布式滑动窗口

在分布式系统中,可以使用Redis实现滑动窗口限流:

-- Redis Lua脚本实现滑动窗口限流
-- KEYS[1]: 限流key
-- ARGV[1]: 窗口大小(毫秒)
-- ARGV[2]: 最大请求数
-- ARGV[3]: 当前时间戳(毫秒)

local key = KEYS[1]
local windowSize = tonumber(ARGV[1])
local maxRequests = tonumber(ARGV[2])
local currentTime = tonumber(ARGV[3])
local minTime = currentTime - windowSize

-- 移除过期的请求记录
redis.call('ZREMRANGEBYSCORE', key, 0, minTime)

-- 添加当前请求
redis.call('ZADD', key, currentTime, currentTime)

-- 设置key的过期时间,比窗口稍长
redis.call('EXPIRE', key, math.floor(windowSize/1000) + 1)

-- 获取当前窗口内的请求数
local requestCount = tonumber(redis.call('ZCARD', key))

-- 检查是否超过限制
if requestCount > maxRequests then
    return 0  -- 被限流
else
    return 1  -- 允许通过
end

五、漏桶算法:稳定输出的流量整形器

5.1 算法原理

漏桶算法将请求比作水,流入漏桶,然后以固定速率流出。当流入速度过快,桶满后多余的水(请求)将被丢弃。

核心特点

  • 请求以任意速率流入
  • 以固定速率处理请求
  • 桶容量固定,超出容量的请求被拒绝
  • 能平滑突发流量,但无法应对突发流量

漏桶算法原理图

5.2 代码实现

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class LeakBucketLimiter {
    // 上一次出水时间
    private static long lastOutTime = System.currentTimeMillis();
    // 漏水速率(每秒处理请求数)
    private static int leakRate = 2;
    // 桶容量
    private static int capacity = 2;
    // 当前水量
    private static AtomicInteger water = new AtomicInteger(0);

    /**
     * 判断请求是否被限流
     * @param taskId 任务ID
     * @param turn 当前轮次
     * @return true 表示被限流,false 表示允许通过
     */
    public static synchronized boolean isLimit(long taskId, int turn) {
        // 如果是空桶,设置当前时间为漏出时间
        if (water.get() == 0) {
            lastOutTime = System.currentTimeMillis();
            water.addAndGet(1);
            return false;
        }
        
        // 计算漏出的水量
        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
        // 计算剩余水量
        int waterLeft = water.get() - waterLeaked;
        water.set(Math.max(0, waterLeft));
        // 更新漏出时间
        lastOutTime = System.currentTimeMillis();
        
        // 尝试加水,如果未满则允许通过
        if (water.get() < capacity) {
            water.addAndGet(1);
            return false;
        } else {
            // 桶已满,拒绝请求
            return true;
        }
    }

    public static void main(String[] args) {
        // 被限制的请求数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程执行轮数
        final int turns = 20;
        // 线程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        
        ExecutorService pool = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < threads; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < turns; j++) {
                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimit(taskId, j);
                        if (intercepted) {
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        float time = (System.currentTimeMillis() - start) / 1000F;
        // 输出统计结果
        log.info("限制的次数为:{},通过的次数为:{}", 
                limited.get(), (threads * turns - limited.get()));
        log.info("限制的比例为:{}", (float) limited.get() / (float) (threads * turns));
        log.info("运行的时长为:{}秒", time);
    }
}

5.3 漏桶算法的局限性

  1. 无法应对突发流量:出水速率固定,即使系统有额外处理能力也无法利用
  2. 后端能力提升受限:当通过动态扩容提升系统处理能力时,漏桶无法自动适应
  3. 资源利用率不高:在非高峰期,系统处理能力可能未被充分利用

六、令牌桶算法:灵活应对突发流量

6.1 算法原理

令牌桶算法以固定速率向桶中添加令牌,请求需要获取令牌才能被处理。当桶中无令牌时,请求被拒绝。

核心特点

  • 以固定速率生成令牌
  • 请求需要获取令牌才能被处理
  • 桶容量固定,超出容量的令牌不再添加
  • 能应对突发流量,允许短时间内的高流量

令牌桶算法原理图

6.2 代码实现

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class TokenBucketLimiter {
    // 上一次令牌发放时间
    private long lastTime = System.currentTimeMillis();
    // 桶容量
    private int capacity = 2;
    // 令牌生成速率(每秒)
    private int rate = 2;
    // 当前令牌数量
    private AtomicInteger tokens = new AtomicInteger(0);

    /**
     * 判断请求是否被限流
     * @param taskId 任务ID
     * @param applyCount 申请令牌数量
     * @return true 表示被限流,false 表示允许通过
     */
    public synchronized boolean isLimited(long taskId, int applyCount) {
        long now = System.currentTimeMillis();
        // 计算时间间隔(毫秒)
        long gap = now - lastTime;
        // 计算时间段内生成的令牌数
        int reverse_permits = (int) (gap * rate / 1000);
        int all_permits = tokens.get() + reverse_permits;
        // 更新当前令牌数量(不超过容量)
        tokens.set(Math.min(capacity, all_permits));
        log.info("当前令牌数:{},桶容量:{},时间间隔:{}ms", tokens, capacity, gap);
        
        if (tokens.get() < applyCount) {
            // 令牌不足,拒绝请求
            return true;
        } else {
            // 令牌充足,获取令牌
            tokens.getAndAdd(-applyCount);
            lastTime = now;
            return false;
        }
    }

    public static void main(String[] args) {
        TokenBucketLimiter limiter = new TokenBucketLimiter();
        
        // 被限制的请求数
        AtomicInteger limited = new AtomicInteger(0);
        // 线程数
        final int threads = 2;
        // 每条线程执行轮数
        final int turns = 20;
        // 线程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        
        ExecutorService pool = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < threads; i++) {
            pool.submit(() -> {
                try {
                    for (int j = 0; j < turns; j++) {
                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = limiter.isLimited(taskId, 1);
                        if (intercepted) {
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        float time = (System.currentTimeMillis() - start) / 1000F;
        // 输出统计结果
        log.info("限制的次数为:{},通过的次数为:{}", 
                limited.get(), (threads * turns - limited.get()));
        log.info("限制的比例为:{}", (float) limited.get() / (float) (threads * turns));
        log.info("运行的时长为:{}秒", time);
    }
}

6.3 令牌桶的优势

  1. 支持突发流量:允许在短时间内处理超过平均速率的请求
  2. 适应系统能力变化:可通过调整令牌生成速率快速响应系统扩容
  3. 资源利用更高效:在系统空闲时积累令牌,高峰期可处理更多请求

七、Guava RateLimiter:令牌桶的优雅实现

Google Guava库提供了RateLimiter类,实现了两种令牌桶算法:

  • SmoothBursty:平滑突发限流,允许一定程度的突发流量
  • SmoothWarmingUp:平滑预热限流,适用于需要预热的系统
import com.google.common.util.concurrent.RateLimiter;

public class GuavaRateLimiterDemo {
    public static void main(String[] args) {
        // 创建每秒2个令牌的限流器
        RateLimiter rateLimiter = RateLimiter.create(2.0);
        
        // 尝试获取1个令牌,如果没有足够令牌则等待
        double waitTime = rateLimiter.acquire();
        System.out.println("获取1个令牌耗时:" + waitTime + "秒");
        
        // 尝试获取3个令牌
        waitTime = rateLimiter.acquire(3);
        System.out.println("获取3个令牌耗时:" + waitTime + "秒");
    }
}

八、Nginx限流实战:网关层的流量控制

8.1 基本配置

Nginx提供limit_req_zonelimit_req指令实现漏桶算法限流:

# 定义限流区域
# $arg_sku_id 从URL参数中提取sku_id
# zone=skuzone:10m 定义10MB的共享内存区域
# rate=6r/m 限制为每分钟6个请求
limit_req_zone $arg_sku_id zone=skuzone:10m rate=6r/m;
limit_req_zone $http_user_id zone=userzone:10m rate=6r/m;
limit_req_zone $binary_remote_addr zone=perip:10m rate=6r/m;
limit_req_zone $server_name zone=perserver:1m rate=10r/s;

8.2 限流规则配置

# 按SKU ID限流
location = /ratelimit/sku {
    limit_req zone=skuzone;
    echo "正常的响应";
}

# 按用户ID限流
location = /ratelimit/demo {
    limit_req zone=userzone;
    echo "正常的响应";
}

# 自定义错误页面
location = /50x.html {
    echo "限流后的降级内容";
}
error_page 502 503 =200 /50x.html;

8.3 三种限流模式详解

8.3.1 严格限流(无缓冲)

limit_req zone=limti_req_zone;
  • 严格按照配置的速率处理请求
  • 超出速率的请求立即拒绝
  • 无请求延时

8.3.2 带缓冲的限流

limit_req zone=limti_req_zone burst=5;
  • 严格按照配置的速率处理请求
  • 设置大小为5的缓冲队列,请求在队列中等待
  • 超出缓冲队列的请求被拒绝
  • 有请求延时

8.3.3 瞬时处理能力

limit_req zone=req_zone burst=5 nodelay;
  • 允许瞬时处理(burst + rate)个请求
  • 峰值范围内的请求不存在等待
  • 超出峰值的请求被拒绝

九、分布式限流:Redis+Lua实现

9.1 为什么需要分布式限流?

Nginx限流仅在同一节点内有效,而在生产环境中,网关通常是多节点部署。为实现全局统一限流,需要分布式限流组件。

9.2 Redis+Lua限流脚本

--- 申请令牌
--- -1 失败
--- 1 成功
--- @param key 限流关键字
--- @param apply 申请的令牌数量
local function acquire(key, apply)
    local times = redis.call('TIME');
    -- 计算当前毫秒时间戳
    local curr_mill_second = times[1] * 1000000 + times[2];
    curr_mill_second = curr_mill_second / 1000;
    
    -- 获取限流信息
    local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
    local last_mill_second = cacheInfo[1];
    local curr_permits = tonumber(cacheInfo[2]);
    local max_permits = tonumber(cacheInfo[3]);
    local rate = cacheInfo[4];
    
    local local_curr_permits = 0;
    if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
        -- 计算时间段内的令牌数
        local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate);
        -- 令牌总数
        local expect_curr_permits = reverse_permits + curr_permits;
        -- 可以申请的令牌总数
        local_curr_permits = math.min(expect_curr_permits, max_permits);
    else
        -- 第一次获取令牌
        redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
        local_curr_permits = max_permits;
    end
    
    -- 有足够的令牌可以申请
    if (local_curr_permits - apply >= 0) then
        -- 保存剩余的令牌
        redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply);
        -- 保存时间
        redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
        return 1;
    else
        return -1;
    end
end

--- 初始化限流
--- @param key 限流关键字
--- @param max_permits 桶的容量
--- @param rate 令牌的发放速率
local function init(key, max_permits, rate)
    local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
    local org_max_permits = tonumber(rate_limit_info[3])
    local org_rate = rate_limit_info[4]
    if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) then
        redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits)
    end
    return 1;
end

--- 删除限流
local function delete(key)
    redis.pcall("DEL", key)
    return 1;
end

local key = KEYS[1]
local method = ARGV[1]
if method == 'acquire' then
    return acquire(key, ARGV[2])
elseif method == 'init' then
    return init(key, ARGV[2], ARGV[3])
elseif method == 'delete' then
    return delete(key)
else
    -- 忽略
end

9.3 脚本加载与调用

# 加载脚本并获取SHA1
/usr/local/redis/bin/redis-cli -a 123456 script load "$(cat rate_limiter.lua)"

# 初始化限流
/usr/local/redis/bin/redis-cli -a 123456 evalsha "cf43613f172388c34a1130a760fc699a5ee6f2a9" 1 "rate_limiter:seckill:1" init 1 1

# 申请令牌
/usr/local/redis/bin/redis-cli -a 123456 evalsha "cf43613f172388c34a1130a760fc699a5ee6f2a9" 1 "rate_limiter:seckill:1" acquire 1

十、限流最佳实践

10.1 限流维度选择

  • 用户维度限流:基于用户ID或IP,防止恶意用户刷量
  • 接口维度限流:保护特定接口不被过度调用
  • 服务维度限流:保护下游服务不被过载
  • 商品维度限流:如秒杀场景中限制单个商品的请求量

10.2 限流策略选择

算法 适用场景 优点 缺点
计数器 简单限流、固定窗口 实现简单 临界问题严重
滑动窗口 需要精确限流的常规场景 解决临界问题、实现相对简单 无法应对突发流量
漏桶 需要平滑流量的场景 流量平滑 无法应对突发流量
令牌桶 需要处理突发流量的场景 灵活应对突发 实现相对复杂

10.3 限流实施建议

  1. 分级限流:接入层、服务层、数据层多级限流
  2. 动态调整:根据系统负载动态调整限流阈值
  3. 降级配合:限流与服务降级配合使用
  4. 监控告警:实时监控限流情况,及时告警
  5. 友好提示:对被限流的用户返回友好提示

十一、四大限流算法对比总结

算法 优点 缺点 适用场景
计数器 实现简单 临界问题严重 简单场景、低精度要求
滑动窗口 解决临界问题、实现相对简单 无法应对突发流量 需要精确限流的常规场景
漏桶 流量整形、平滑输出 无法应对突发流量 需要平滑流量的场景
令牌桶 支持突发流量、灵活 实现相对复杂 需要处理突发流量的场景

11.1 如何选择限流算法?

  1. 系统简单、要求不高:使用计数器算法
  2. 需要精确限流且实现简单:使用滑动窗口算法
  3. 需要平滑流量输出:使用漏桶算法
  4. 需要处理突发流量:使用令牌桶算法
  5. 分布式系统:使用Redis+Lua实现分布式限流

十二、总结

限流是保障系统高可用的关键手段。详细介绍了四大限流算法:计数器、滑动窗口、漏桶和令牌桶,分析了它们的原理、实现和适用场景。

  • 计数器算法简单但存在临界问题
  • 滑动窗口算法解决了计数器的临界问题,实现相对简单
  • 漏桶算法能平滑流量但无法应对突发
  • 令牌桶算法灵活,能应对突发流量,推荐作为首选
  • Nginx限流适用于网关层,简单高效
  • 分布式限流需借助Redis+Lua实现全局控制

在实际应用中,应根据具体业务场景选择合适的限流策略,并结合监控、降级等手段,构建全方位的系统保护机制。记住,限流不是目的,而是保障系统稳定可用的手段

随着系统复杂度的提升,自适应限流将成为未来趋势。自适应限流能够根据系统实时负载动态调整限流阈值,实现更智能的流量控制。例如,可以根据CPU使用率、内存使用率、响应时间等指标,自动调整限流阈值,实现系统资源的最优利用。

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到