漏桶算法
漏桶算法解决了时间窗口类算法的痛点,可以使流量更加的平滑;
漏桶(Leaky Bucket
)算法可以理解为注水漏水的过程,往漏桶中以任意速率流入水,以固定的速率流出水。当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。
- 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的;
- 桶的容量一般用来表示系统所能处理的请求数;
- 如果桶的容量满了,也就达到限流的阀值,会丢弃水滴(即:拒绝请求);
- 流出的水滴,是恒定速率的,用来表示服务按照固定的速率处理请求。
消息中间件MQ采用的正是漏桶的思想,水滴的流入和留出可以看做是生产者消费者模式;
- 请求是一个生产者,每一个请求都如一滴水,请求到来后放到一个队列(漏桶)中;
- 桶底有一个孔,不断的漏出水滴,就像消费者不断的消费队列中的内容,并且消费的速率(漏出的速度)等于限流阈值。
package com.saint.algorithm.limiting;
import java.time.LocalDateTime;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 漏桶算法
* <p>
* 可以理解为注水漏水的过程,往漏桶中以任意速率流入水,以固定的速率流出水。
* 当水超过桶的容量时,会被溢出,也就是被丢弃。因为桶容量是不变的,保证了整体的速率。
* 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的;
* 桶的容量一般用来表示系统所能处理的请求数;如果桶的容量满了,也就达到限流的阀值,会丢弃水滴(即:拒绝请求);
* 流出的水滴,是恒定速率的,用来表示服务按照固定的速率处理请求。
*
* @author Saint
*/
public class LeakyBucketRateLimiter {
private int qps = 2;
// 漏桶
private LinkedBlockingQueue<Character> waterBucket;
// 表示桶是否被初始化
private volatile boolean initialized = false;
// 关联一个漏桶(一般情况为自身)
private static volatile LeakyBucketRateLimiter leakyBucketRateLimiter;
public static LeakyBucketRateLimiter getLeakyBucket() {
return leakyBucketRateLimiter;
}
/**
* 创建一个漏桶
*
* @param capacity 漏桶容量
* @param qps 漏桶支持的QPS
* @return
*/
public static LeakyBucketRateLimiter create(int capacity, int qps) {
leakyBucketRateLimiter = new LeakyBucketRateLimiter(capacity, qps);
return leakyBucketRateLimiter;
}
private LeakyBucketRateLimiter(int capacity, int qps) {
// 漏桶只能被初始化一次
if (!initialized) {
this.qps = qps;
waterBucket = new LinkedBlockingQueue<>(capacity);
// 初始化消费者
initConsumer();
}
}
/**
* 漏桶中的水以固定速率流出
*/
private void initConsumer() {
new Thread(() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(1000 / qps);
} catch (InterruptedException e) {
// log for exception
System.out.println("Exception occur! " + e);
}
// 以固定速率消费
waterBucket.poll();
}
}).start();
}
/**
* 是否将请求加入到漏桶,能存入则代表漏桶没满,允许请求通过,返回true;否则返回false
*
* @return
*/
public boolean tryAcquire() {
return waterBucket.offer('S');
}
/**
* 漏桶容量2,每秒流出2滴水;
* 最初漏桶中没有水,所以第一秒立刻打进去两滴水;又由于漏桶每秒流出2滴水,所以在程序开始跑时,第一秒必回流出一滴水(极限情况2滴)
* 所以第一秒会进入3滴水,第一秒之后每秒稳定流入2滴水,即QPS为2;
*
* @param args
*/
public static void main(String[] args) {
LeakyBucketRateLimiter leakyBucket = LeakyBucketRateLimiter.create(2, 2);
for (int i = 0; i < 20; i++) {
LocalDateTime now = LocalDateTime.now();
if (leakyBucket.tryAcquire()) {
System.out.println(now + " pass the rate limiting");
} else {
System.out.println(now + " was limited");
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
// 日志记录
}
}
System.out.println("------------");
// 再次获取漏桶
LeakyBucketRateLimiter leakyBucket2 = LeakyBucketRateLimiter.getLeakyBucket();
// 验证漏桶只会有一个
System.out.println("leakyBucket only one ? " + (leakyBucket == leakyBucket2));
for (int i = 0; i < 10; i++) {
LocalDateTime now = LocalDateTime.now();
if (leakyBucket2.tryAcquire()) {
System.out.println(now + " pass the rate limiting");
} else {
System.out.println(now + " was limited");
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
// 日志记录
}
}
}
}
代码解释:
- 漏桶容量2,每秒流出2滴水;
- 最初漏桶中没有水,所以第一秒立刻打进去两滴水;又由于漏桶每秒流出2滴水,所以在程序开始跑时,第一秒必回流出一滴水(极限情况2滴)
- 所以第一秒会进入3滴水,第一秒之后每秒稳定流入2滴水,即QPS为2;
令牌桶算法
令牌桶(Token Bucket
)算法是对漏桶算法的一种改进,不仅能够平滑限流,还允许一定程度的流量突发;它是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。
令牌桶的实现思路也类似于生产者和消费之间的关系:
系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 500,每 2ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
请求的执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;
如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝请求,以此达到限流目的。
令牌桶的实现特点:
- 1s / 限流阈值(QPS) = 令牌添加时间间隔;
- 桶的容量可以大于限流的阈值(做一定的冗余),令牌数量达到桶容量时,不再添加;
- 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理;
- 令牌桶启动时桶中无令牌,启动后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这个问题。
Google 的 Java 开发工具包 Guava 中的限流工具类 RateLimiter 就是令牌桶的一个实现
package com.saint.algorithm.limiting;
import java.time.LocalDateTime;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 令牌桶算法
* 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 500,每 2ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
* 请求的执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;
* 如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝请求,以此达到限流目的。
* <p>
* 1s / 限流阈值(QPS) = 令牌添加时间间隔;
* 桶的容量可以大于限流的阈值(做一定的冗余),令牌数量达到桶容量时,不再添加;
*
* @author Saint
*/
public class TokenBucketRateLimiter {
private int qps = 2;
// 漏桶
private volatile LinkedBlockingQueue<Character> tokenBucket = null;
// 表示桶是否被初始化
private volatile boolean initialized = false;
// 关联一个漏桶(一般情况为自身)
private static volatile TokenBucketRateLimiter tokenBucketRateLimiter;
public static TokenBucketRateLimiter getTokenBucket() {
return tokenBucketRateLimiter;
}
/**
* 创建一个漏桶
*
* @param capacity 漏桶容量
* @param qps 漏桶支持的QPS
* @return
*/
public static TokenBucketRateLimiter create(int capacity, int qps) {
tokenBucketRateLimiter = new TokenBucketRateLimiter(capacity, qps);
return tokenBucketRateLimiter;
}
private TokenBucketRateLimiter(int capacity, int qps) {
// 漏桶只能被初始化一次
if (!initialized) {
this.qps = qps;
tokenBucket = new LinkedBlockingQueue<>(capacity);
// 初始化生产者
initProducer();
}
}
/**
* 令牌桶中的令牌以固定速率加入
*/
private void initProducer() {
// 令牌桶初始容量为qps
for (int i = 0; i < qps; i++) {
tokenBucket.offer('S');
}
new Thread(() -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(1000 / qps);
} catch (InterruptedException e) {
// log for exception
System.out.println("Exception occur! " + e);
}
// 以固定速率生产令牌
tokenBucket.offer('S');
}
}).start();
}
/**
* 获取令牌,能获取到,允许请求通过,返回true;否则返回false
*
* @return
*/
public boolean tryAcquire() {
return tokenBucket.poll() == null ? false : true;
}
/**
* 令牌桶容量为2,每秒加入2个令牌;
* 最初漏桶中有两个令牌,所以第一秒立刻获取到两个令牌;又由于令牌桶每秒加入2个令牌,所以在程序开始跑时,第一秒必回加入一个令牌(极限情况2个)
* 所以第一秒会获取到3个令牌,第一秒之后每秒稳定获取到两个令牌,即QPS为2;
*
* @param args
*/
public static void main(String[] args) {
TokenBucketRateLimiter tokenBucket = TokenBucketRateLimiter.create(2, 2);
for (int i = 0; i < 20; i++) {
LocalDateTime now = LocalDateTime.now();
if (tokenBucket.tryAcquire()) {
System.out.println(now + " pass the rate limiting");
} else {
System.out.println(now + " was limited");
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
// 日志记录
}
}
System.out.println("------------");
// 再次获取漏桶
TokenBucketRateLimiter tokenBucket2 = TokenBucketRateLimiter.getTokenBucket();
// 验证令牌桶只会有一个
System.out.println("tokenBucket only one ? " + (tokenBucket == tokenBucket2));
for (int i = 0; i < 10; i++) {
LocalDateTime now = LocalDateTime.now();
if (tokenBucket2.tryAcquire()) {
System.out.println(now + " pass the rate limiting");
} else {
System.out.println(now + " was limited");
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
// 日志记录
}
}
}
}
代码解释:
- 令牌桶容量为2,每秒加入2个令牌;
- 最初漏桶中有两个令牌,所以第一秒立刻获取到两个令牌;又由于令牌桶每秒加入2个令牌,所以在程序开始跑时,第一秒必会加入一个令牌(极限情况2个)
- 所以第一秒会获取到3个令牌,第一秒之后每秒稳定获取到两个令牌,即QPS为2;
本文含有隐藏内容,请 开通VIP 后查看