手撕漏桶&令牌桶限流算法(Java版)

发布于:2023-01-14 ⋅ 阅读:(208) ⋅ 点赞:(0)

漏桶算法

漏桶算法解决了时间窗口类算法的痛点,可以使流量更加的平滑;

漏桶(Leaky Bucket)算法可以理解为注水漏水的过程,往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。

  • 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的;
  • 桶的容量一般用来表示系统所能处理的请求数;
    • 如果桶的容量满了,也就达到限流的阀值,会丢弃水滴(即:拒绝请求);
  • 流出的水滴,是恒定速率的,用来表示服务按照固定的速率处理请求。

消息中间件MQ采用的正是漏桶的思想,水滴的流入和留出可以看做是生产者消费者模式;

  • 请求是一个生产者,每一个请求都如一滴水,请求到来后放到一个队列(漏桶)中;
  • 桶底有一个孔,不断的漏出水滴,就像消费者不断的消费队列中的内容,并且消费的速率(漏出的速度)等于限流阈值。
package com.saint.algorithm.limiting;

import java.time.LocalDateTime;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 漏桶算法
 * <p>
 * 可以理解为注水漏水的过程,往漏桶中以任意速率流入水,以固定的速率流出水。
 * 当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。
 * 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的;
 * 桶的容量一般用来表示系统所能处理的请求数;如果桶的容量满了,也就达到限流的阀值,会丢弃水滴(即:拒绝请求);
 * 流出的水滴,是恒定速率的,用来表示服务按照固定的速率处理请求。
 *
 * @author Saint
 */
public class LeakyBucketRateLimiter {

    private int qps = 2;

    // 漏桶
    private LinkedBlockingQueue<Character> waterBucket;

    // 表示桶是否被初始化
    private volatile boolean initialized = false;

    // 关联一个漏桶(一般情况为自身)
    private static volatile LeakyBucketRateLimiter leakyBucketRateLimiter;

    public static LeakyBucketRateLimiter getLeakyBucket() {
        return leakyBucketRateLimiter;
    }

    /**
     * 创建一个漏桶
     *
     * @param capacity 漏桶容量
     * @param qps      漏桶支持的QPS
     * @return
     */
    public static LeakyBucketRateLimiter create(int capacity, int qps) {
        leakyBucketRateLimiter = new LeakyBucketRateLimiter(capacity, qps);
        return leakyBucketRateLimiter;
    }

    private LeakyBucketRateLimiter(int capacity, int qps) {
        // 漏桶只能被初始化一次
        if (!initialized) {
            this.qps = qps;
            waterBucket = new LinkedBlockingQueue<>(capacity);
            // 初始化消费者
            initConsumer();
        }
    }

    /**
     * 漏桶中的水以固定速率流出
     */
    private void initConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 / qps);
                } catch (InterruptedException e) {
                    // log for exception
                    System.out.println("Exception occur! " + e);
                }
                // 以固定速率消费
                waterBucket.poll();
            }
        }).start();
    }

    /**
     * 是否将请求加入到漏桶,能存入则代表漏桶没满,允许请求通过,返回true;否则返回false
     *
     * @return
     */
    public boolean tryAcquire() {
        return waterBucket.offer('S');
    }

    /**
     * 漏桶容量2,每秒流出2滴水;
     * 最初漏桶中没有水,所以第一秒立刻打进去两滴水;又由于漏桶每秒流出2滴水,所以在程序开始跑时,第一秒必回流出一滴水(极限情况2滴)
     * 所以第一秒会进入3滴水,第一秒之后每秒稳定流入2滴水,即QPS为2;
     *
     * @param args
     */
    public static void main(String[] args) {
        LeakyBucketRateLimiter leakyBucket = LeakyBucketRateLimiter.create(2, 2);
        for (int i = 0; i < 20; i++) {
            LocalDateTime now = LocalDateTime.now();
            if (leakyBucket.tryAcquire()) {
                System.out.println(now + " pass the rate limiting");
            } else {
                System.out.println(now + " was limited");
            }

            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                // 日志记录
            }

        }

        System.out.println("------------");
        // 再次获取漏桶
        LeakyBucketRateLimiter leakyBucket2 = LeakyBucketRateLimiter.getLeakyBucket();

        // 验证漏桶只会有一个
        System.out.println("leakyBucket only one ? " + (leakyBucket == leakyBucket2));

        for (int i = 0; i < 10; i++) {
            LocalDateTime now = LocalDateTime.now();
            if (leakyBucket2.tryAcquire()) {
                System.out.println(now + " pass the rate limiting");
            } else {
                System.out.println(now + " was limited");
            }

            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                // 日志记录
            }

        }
    }

}

代码解释:

  • 漏桶容量2,每秒流出2滴水;
  • 最初漏桶中没有水,所以第一秒立刻打进去两滴水;又由于漏桶每秒流出2滴水,所以在程序开始跑时,第一秒必回流出一滴水(极限情况2滴)
  • 所以第一秒会进入3滴水,第一秒之后每秒稳定流入2滴水,即QPS为2;

令牌桶算法

令牌桶(Token Bucket)算法是对漏桶算法的一种改进,不仅能够平滑限流,还允许一定程度的流量突发;它是网络流量整形(Traffic Shaping)速率限制(Rate Limiting)中最常使用的一种算法。

令牌桶的实现思路也类似于生产者和消费之间的关系:

  • 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 500,每 2ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。

  • 请求的执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;

  • 如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝请求,以此达到限流目的。

令牌桶的实现特点:

  1. 1s / 限流阈值(QPS) = 令牌添加时间间隔;
  2. 桶的容量可以大于限流的阈值(做一定的冗余),令牌数量达到桶容量时,不再添加;
  3. 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理;
  4. 令牌桶启动时桶中无令牌,启动后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这个问题。

Google 的 Java 开发工具包 Guava 中的限流工具类 RateLimiter 就是令牌桶的一个实现

package com.saint.algorithm.limiting;

import java.time.LocalDateTime;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 令牌桶算法
 * 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 500,每 2ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
 * 请求的执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;
 * 如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝请求,以此达到限流目的。
 * <p>
 * 1s / 限流阈值(QPS)  = 令牌添加时间间隔;
 * 桶的容量可以大于限流的阈值(做一定的冗余),令牌数量达到桶容量时,不再添加;
 *
 * @author Saint
 */
public class TokenBucketRateLimiter {

    private int qps = 2;

    // 漏桶
    private volatile LinkedBlockingQueue<Character> tokenBucket = null;

    // 表示桶是否被初始化
    private volatile boolean initialized = false;

    // 关联一个漏桶(一般情况为自身)
    private static volatile TokenBucketRateLimiter tokenBucketRateLimiter;

    public static TokenBucketRateLimiter getTokenBucket() {
        return tokenBucketRateLimiter;
    }

    /**
     * 创建一个漏桶
     *
     * @param capacity 漏桶容量
     * @param qps      漏桶支持的QPS
     * @return
     */
    public static TokenBucketRateLimiter create(int capacity, int qps) {
        tokenBucketRateLimiter = new TokenBucketRateLimiter(capacity, qps);
        return tokenBucketRateLimiter;
    }

    private TokenBucketRateLimiter(int capacity, int qps) {
        // 漏桶只能被初始化一次
        if (!initialized) {
            this.qps = qps;
            tokenBucket = new LinkedBlockingQueue<>(capacity);
            // 初始化生产者
            initProducer();
        }
    }

    /**
     * 令牌桶中的令牌以固定速率加入
     */
    private void initProducer() {
        // 令牌桶初始容量为qps
        for (int i = 0; i < qps; i++) {
            tokenBucket.offer('S');
        }
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 / qps);
                } catch (InterruptedException e) {
                    // log for exception
                    System.out.println("Exception occur! " + e);
                }
                // 以固定速率生产令牌
                tokenBucket.offer('S');
            }
        }).start();
    }

    /**
     * 获取令牌,能获取到,允许请求通过,返回true;否则返回false
     *
     * @return
     */
    public boolean tryAcquire() {
        return tokenBucket.poll() == null ? false : true;
    }

    /**
     * 令牌桶容量为2,每秒加入2个令牌;
     * 最初漏桶中有两个令牌,所以第一秒立刻获取到两个令牌;又由于令牌桶每秒加入2个令牌,所以在程序开始跑时,第一秒必回加入一个令牌(极限情况2个)
     * 所以第一秒会获取到3个令牌,第一秒之后每秒稳定获取到两个令牌,即QPS为2;
     *
     * @param args
     */
    public static void main(String[] args) {
        TokenBucketRateLimiter tokenBucket = TokenBucketRateLimiter.create(2, 2);
        for (int i = 0; i < 20; i++) {
            LocalDateTime now = LocalDateTime.now();
            if (tokenBucket.tryAcquire()) {
                System.out.println(now + " pass the rate limiting");
            } else {
                System.out.println(now + " was limited");
            }

            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                // 日志记录
            }

        }

        System.out.println("------------");
        // 再次获取漏桶
        TokenBucketRateLimiter tokenBucket2 = TokenBucketRateLimiter.getTokenBucket();

        // 验证令牌桶只会有一个
        System.out.println("tokenBucket only one ? " + (tokenBucket == tokenBucket2));

        for (int i = 0; i < 10; i++) {
            LocalDateTime now = LocalDateTime.now();
            if (tokenBucket2.tryAcquire()) {
                System.out.println(now + " pass the rate limiting");
            } else {
                System.out.println(now + " was limited");
            }

            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                // 日志记录
            }

        }
    }

}

代码解释:

  • 令牌桶容量为2,每秒加入2个令牌;
  • 最初漏桶中有两个令牌,所以第一秒立刻获取到两个令牌;又由于令牌桶每秒加入2个令牌,所以在程序开始跑时,第一秒必会加入一个令牌(极限情况2个)
  • 所以第一秒会获取到3个令牌,第一秒之后每秒稳定获取到两个令牌,即QPS为2;
本文含有隐藏内容,请 开通VIP 后查看