任何一个服务都有其承载能力,当流量大于这个阈值时,客户端请求就不能全部被正常处理,严重时,甚至还会发生雪崩效应,一个api出现问题,都会导致整个服务全部down机,因此,为了避免服务承载流量过大,为其增加限流机制,是保证服务平稳运行最重要且有效的手段。
草原天路-桦皮岭
限制流量,旨在确保服务的稳定性、可用性和公平性,同时防止恶意攻击或资源的滥用,根据不同的应用场景,常规的限流手段可以分为以下几类:
粒度 |
说明 |
示例 |
全局限流 |
限制整个系统最大QPS,防止系统过载 |
全站最大QPS100 |
用户级限流 |
控制单个用户的访问频率 |
每用户每秒最多10次请求 |
IP限流 |
对单IP限速,防止攻击 |
单IP每分钟最多300请求 |
接口级限流 |
高成本/高风险接口设置更严格的限流 |
推理接口,每秒最多5次 |
下面我们就以golang.org/x/time/rate包为核心,实现不同的限流器。
1、全局限流方案
golang在线服务,很多都是使用了gin框架,我们就以gin为例,来说明全局限流器的实现,首先定义全局限流器,并初始化:
var (
GlobalLimiter *rate.Limiter
GlobalLimiterCnt = 100
GlobalLimiterMax = 150
)
func LimitInit() {
// 这里可以初始化一些全局变量或者执行一些初始化的操作
resource.GlobalLimiter = rate.NewLimiter(rate.Limit(resource.GlobalLimiterCnt), resource.GlobalLimiterMax)
}
以上代码的含义是,初始化一个限流器,qps限制100,峰值150。
实现gin中间件:
func GlobalLimiter(ctx *gin.Context) {
if !resource.GlobalLimiter.Allow() {
logservice.Notice("too many requests")
ctx.AbortWithStatus(http.StatusTooManyRequests)
return
}
ctx.Next()
}
该函数的含义很简单,当限流器不允许时,打印日志,http状态码返回429。
在路由中使用该中间件:
func Register(engine *gin.Engine) {
engine.Use(middlewares.Print, middlewares.GlobalLimiter)
engine.POST("mianhuatang/image_search", controllers.ImageSearch)
engine.POST("mianhuatang/load/articles", controllers.LoadArticles)
}
压测工具使用开源的【hey】,hey 是由 Golang 开发的高性能 HTTP 压力测试工具,支持多线程并发请求、异步 I/O 处理及复杂 HTTP 请求生成。其核心优势在于轻量级、高并发和低资源消耗,适用于快速评估 Web 应用在高负载下的性能表现。
启动服务,进行压力测试,观察限流情况,为了便于观察限流情况,全局限流设置为1qps,峰值5qps。
我们先构造50个请求,单线程发送,速率限制在5qps,hey工具命令行如下:
hey -n 50 -c 1 -q 5 -m POST http://192.168.3.65:8200/mianhuatang/load/articles
-n:设置发送的请求数量
-c:发送请求的线程数量
-q:单线程发送请求速率
-m:http 方法
运行结果如下:
一共发送50个请求,5qps的速率发送,server限速1qps,峰值5qps,我们看上图中如下信息:
Status code distribution:
[200] 14 responses
[429] 36 responses
正常处理14个请求,其他36个请求被拒绝,因为服务限速1qps,所以5qps速率,其中有4个请求会被拒绝,这样算应该只能处理10个请求才对,为什么会多处理了4个呢?原因就在服务的峰值qps上,我们设置的是5,因此第一秒的5个请求是都能处理的,这就是为什么多出来4个的原因。
2、基于客户端IP进行限流
通过IP、接口、用户的限流方案是一样的,都是基于不同的key设置独立的限流器,从而达到限流的目的,我们就以客户端IP限流为例进行说明。
首先声明一个字典,用于存储不同的限流器:
var (
Mux sync.Mutex
IpLimiter map[string]*rate.Limiter
IpLimiterCnt = 1
IpLimiterMax = 5
)
初始化IpLimiter:
func LimitInit() {
// 这里可以初始化一些全局变量或者执行一些初始化的操作
resource.IpLimiter = make(map[string]*rate.Limiter)
}
实现限流中间件:
func getIpLimiter(ip string) *rate.Limiter {
// 加锁,确保线程安全
resource.Mux.Lock()
defer resource.Mux.Unlock()
// 从全局变量中获取当前IP对应的限流器
limiter, exists := resource.IpLimiter[ip]
if !exists {
// 创建一个新的限流器,设置速率和桶的大小
limiter = rate.NewLimiter(rate.Limit(resource.IpLimiterCnt), resource.IpLimiterMax)
// 将新的限流器添加到全局变量中
resource.IpLimiter[ip] = limiter
}
// 返回限流器
return limiter
}
func IpLimitMiddleware(ctx *gin.Context) {
// 获取客户端的IP地址
ip := ctx.ClientIP()
// 根据IP地址获取访问限制器
limiter := getIpLimiter(ip)
// 判断访问限制器是否允许访问
if !limiter.Allow() {
// 如果不允许访问,记录日志
logservice.Notice("too many requests,ip:" + ip)
// 返回状态码429,表示请求过多
ctx.AbortWithStatus(http.StatusTooManyRequests)
return
}
// 允许访问,继续执行下一个中间件或处理程序
ctx.Next()
}
在路由中使用该中间件:
func Register(engine *gin.Engine) {
// engine.Use(middlewares.Print, middlewares.GlobalLimiter)
engine.Use(middlewares.Print, middlewares.IpLimitMiddleware)
}
测试:
使用两台电脑,分别向服务进行压力测试,发送速率:5qps,共50个请求。
预期应该是这样的,由于是通过ip限流,两台电脑的请求,5qps的速率,每个client都能够处理1qps的流量。
上图中,192.168.3.15以及192.168.3.65两个ip,5个请求中能够处理一个,其他请求被拒绝了,实现了按ip进行限流的方案。
以上两种限流方案,基本上就可以满足日常开发的需要,如果是普通的C端服务,增加全局限流即可,而对于一些特殊的服务,比如包含推理、大数据检索等耗时的api,最好是通过api进行限流,以更大限度的保证服务的高可用和低延时。
以上两种方案,希望能够帮到大家。
欢迎各位热爱技术的小伙伴们点个关注,聊聊技术、聊聊跑步~~~。
往期推荐:
2025越山向海张家口站:从草原天路到崇礼142.6公里,我们一起跑过。
大同华严寺:受人民爱戴的耿市长还会回来吗?薄伽教藏的合掌漏齿菩萨你知道是谁吗?
云冈石窟:翻开这本距今1565年、与天地同久长的石头史书,感受北魏王朝雕刻艺术的巅峰之作。
Elastic:索引生命周期管理(Index Lifecycle Management)-减轻ES存储压力
RabbitMq:消费侧未限流导致的rabbitmq报错异常,消息不能正常消费。
一个异步架构设计:批量消费RabbitMQ,批量写入Elasticsearch(golang实现)
一文揭秘:Golang+Elasticsearch轻松搭建AI时代的图片搜索服务
tritonserver学习之四:tritonserver运行流程
tritonserver学习之九:tritonserver grpc异步模式
tritonserver学习之八:redis_caches实践
tritonserver学习之六:自定义c++、python custom backend实践