这个文件实现了一个基于信号量(Channel)的简单限流器。
基础知识
总共有四种channel
带缓冲的channel
nonMutatingChan、mutatingChan 都是带缓冲的channel ,这类channel 的特点是:
这允许最多 mutatingLimit /nonMutatingLimit 个请求同时获取令牌并执行,直到缓冲区满了才会阻塞新的请求。对带缓冲 channel 的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收不需要阻塞等待)。
但当缓冲区满了的情况下,对它进行发送操作的 Goroutine 就会阻塞挂起;当缓冲区为空的情况下,对它进行接收操作的 Goroutine 也会阻塞挂起。
本代码中限流通道从创建到服务器关闭始终保持打开状态,因为这些通道用于控制并发请求数量的计数器,需要持续使用。
不带缓冲的channel
对无缓冲 channel 类型的发送与接收操作,一定要放在两个不同的 Goroutine 中进行,否则会导致 deadlock。
通道类型 | 创建方式 | 主要特点 | 典型使用场景 |
---|---|---|---|
无缓冲通道 | make(chan T) | 同步、阻塞 | 信号通知、精确同步 |
带缓冲通道 | make(chan T, size) | 异步、缓冲 | 限流控制、任务队列 |
只读通道 | <-chan T | 只能接收 | 消费者模式、类型安全 |
只写通道 | chan<- T | 只能发送 | 生产者模式、类型安全 |
nil 通道 | var ch chan T | 阻塞操作 | 条件化通信、禁用分支 |
Kubernetes 的 maxinflight 限流器选择带缓冲通道作为实现方式,正是利用了其特有的信号量特性、非阻塞操作和容量控制能力。
当 select 语句中没有 default 分支,而且所有 case 中的 channel 操作都阻塞了的时候,整个 select 语句都将被阻塞,直到某一个 case 上的 channel 变成可发送,或者某个 case 上的 channel 变成可接收,select 语句才可以继续进行下去。
Kubernetes API Server maxinflight.go 限流分析
这个文件实现了一个基于信号量(Channel)的简单限流器,主要使用了两种限流通道:
1. 限流分类
var nonMutatingChan chan bool // 非修改性请求的限流通道
var mutatingChan chan bool // 修改性请求的限流通道
请求分类
- 非修改性请求(Non-Mutating):get、list、watch 操作
- 修改性请求(Mutating):create、update、patch、delete 等操作
2. 限流实现机制
核心实现使用了带缓冲的 Channel 作为信号量:
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit) // 非修改性请求限流器
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit) // 修改性请求限流器
}
限流处理流程
- 请求到达时:
select {
case c <- true: // 尝试获取令牌
// 处理请求
defer func() {
<-c // 释放令牌
}()
handler.ServeHTTP(w, r)
default:
// 获取不到令牌时返回 429 Too Many Requests
tooManyRequests(r, w, retryAfter)
}
这里的select 实现了快速失败策略,所有的请求,要么被处理,要么直接扔到default中。 实现了非阻塞并发
2. 特权用户处理:
// 系统管理员(system:master)不受限流影响
if currUser, ok := apirequest.UserFrom(ctx); ok {
for _, group := range currUser.GetGroups() {
if group == user.SystemPrivilegedGroup {
handler.ServeHTTP(w, r)
return
}
}
}
3. 监控指标
使用 watermark 记录并发请求数:
type requestWatermark struct {
phase string
readOnlyObserver, mutatingObserver fcmetrics.RatioedGauge
readOnlyWatermark, mutatingWatermark int
lock sync.Mutex
}
指标更新
- 定期更新 inflight 使用指标
- 记录最大并发数
- 分别统计读写请求
4. 特点总结
简单高效
- 使用 Channel 实现,开销小
- 无需复杂的算法
区分请求类型
- 读写请求分开限流
- 更细粒度的控制
特权用户豁免
- 系统管理员不受限
- 确保关键操作可执行
监控完善
- 记录限流指标
- 支持监控告警
无时间窗口
- 不同于令牌桶等算法
- 纯并发数控制
这是一个轻量级但有效的限流方案,适合 API Server 的需求。
Kubernetes API Server 中的两种限流通道分析
Kubernetes API Server 在 maxinflight.go 文件中实现了两种不同的限流通道,分别用于处理不同类型的请求:
1. 非修改性请求限流通道 (nonMutatingChan)
var nonMutatingChan chan bool
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
}
- 处理的请求类型:
get
、list
、watch
等读取操作 - 判断依据:
nonMutatingRequestVerbs.Has(requestInfo.Verb)
- 目的:限制并发读取操作的数量
2. 修改性请求限流通道 (mutatingChan)
var mutatingChan chan bool
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
}
- 处理的请求类型:
create
、update
、delete
、patch
等写入操作 - 判断依据:
!nonMutatingRequestVerbs.Has(requestInfo.Verb)
- 目的:限制并发写入操作的数量
主要区别
目标请求不同
nonMutatingChan
针对读操作mutatingChan
针对写操作
资源消耗的差异
- 写操作通常消耗更多资源,可能会修改集群状态
- 读操作相对轻量,但数量可能更大
限流阈值不同
- API Server 配置中可以为这两种通道设置不同的限流阈值
- 通常写操作的限流阈值会低于读操作
监控指标分离
- 代码中对两种请求的监控也是分开的:
watermark.recordMutating(len(c)) watermark.recordReadOnly(len(c))
- 代码中对两种请求的监控也是分开的:
限流效果的差异
- 当读请求过多时,只会拒绝新的读请求,写请求不受影响
- 当写请求过多时,只会拒绝新的写请求,读请求不受影响
实现原理
两种通道的底层实现方式相同,都是使用带缓冲的 channel 作为信号量:
select {
case c <- true: // 尝试获取令牌
// 处理请求并记录指标
defer func() {
<-c // 释放令牌
}()
handler.ServeHTTP(w, r)
default:
// 无法获取令牌,返回 429 Too Many Requests
tooManyRequests(r, w, retryAfter)
}
这种区分读写请求的限流设计,使 Kubernetes API Server 能够在高负载情况下更有效地分配资源,确保系统的稳定性和可用性。
限流参数 nonMutatingLimit 、mutatingLimit 意义:
这段代码:
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler
}
意思解释
这是一个前置检查,用于判断是否需要启用限流功能:
- 如果
nonMutatingLimit
和mutatingLimit
都为 0,表示不需要对任何类型的请求进行限流 - 在这种情况下,函数直接返回原始的
handler
,不添加任何限流逻辑 - 相当于完全跳过限流处理,请求会直接传递给下一个处理器
为什么需要这个检查
性能优化
- 如果不需要限流,避免创建不必要的通道和记录指标的开销
- 减少请求处理的额外层级,提高性能
功能开关
- 提供一种方式完全禁用限流功能
- 管理员可以通过配置参数控制是否启用限流
兼容性
- 允许那些不需要限流的环境(如小型开发集群)简化配置
- 保持与低资源环境的兼容性
实际应用
在 Kubernetes API Server 配置中,可以通过这些参数控制限流:
apiServer:
maxRequestsInflight: 400 # nonMutatingLimit 参数
maxMutatingRequestsInflight: 200 # mutatingLimit 参数
如果将这两个值都设为 0,API Server 将不会对任何请求进行并发限制,可能适用于:
- 开发/测试环境
- 低负载集群
- 有外部限流机制的环境
这提供了一个简单的开关,使管理员能够灵活控制是否启用 API Server 的内置限流功能。
基于"通过通信共享内存"原则分析maxinflight.go
从Go语言的设计哲学"不要通过共享内存来通信,而要通过通信来共享内存"(Don’t communicate by sharing memory; share memory by communicating)来看,maxinflight.go
有几个违背此原则的地方:
违背原则的地方
1. 共享状态管理方式
// 使用互斥锁保护共享状态
type requestWatermark struct {
phase string
readOnlyObserver, mutatingObserver fcmetrics.RatioedGauge
lock sync.Mutex // 互斥锁
readOnlyWatermark, mutatingWatermark int
}
// 全局共享变量
var watermark = &requestWatermark{
phase: metrics.ExecutingPhase,
}
这里使用了传统的"共享内存+锁"的并发控制模式,而不是Go推荐的基于通道的模式。
2. 记录指标的方法
func (w *requestWatermark) recordMutating(mutatingVal int) {
w.mutatingObserver.Set(float64(mutatingVal))
w.lock.Lock()
defer w.lock.Unlock()
if w.mutatingWatermark < mutatingVal {
w.mutatingWatermark = mutatingVal
}
}
这里直接修改共享状态,而不是通过消息传递。
3. 周期性更新指标
go wait.Until(func() {
watermark.lock.Lock()
readOnlyWatermark := watermark.readOnlyWatermark
mutatingWatermark := watermark.mutatingWatermark
watermark.readOnlyWatermark = 0
watermark.mutatingWatermark = 0
watermark.lock.Unlock()
metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod, stopCh)
使用锁直接访问和修改共享状态,而不是使用通道接收信息。
符合Go哲学的重构方案
根据"通过通信共享内存"的原则,可以这样重构:
// 定义度量指标更新消息
type MetricUpdate struct {
ReadOnly int
Mutating int
}
// 创建通道
var metricCh = make(chan MetricUpdate)
var readOnlyMetricCh = make(chan int)
var mutatingMetricCh = make(chan int)
// 启动指标收集器goroutine
func startMetricCollector(stopCh <-chan struct{}) {
readOnlyMax := 0
mutatingMax := 0
// 定期更新指标
ticker := time.NewTicker(inflightUsageMetricUpdatePeriod)
defer ticker.Stop()
for {
select {
case val := <-readOnlyMetricCh:
if val > readOnlyMax {
readOnlyMax = val
}
case val := <-mutatingMetricCh:
if val > mutatingMax {
mutatingMax = val
}
case <-ticker.C:
// 更新并重置最大值
metrics.UpdateInflightRequestMetrics(metrics.ExecutingPhase, readOnlyMax, mutatingMax)
readOnlyMax = 0
mutatingMax = 0
case <-stopCh:
return
}
}
}
// 记录指标的新函数
func recordReadOnly(val int) {
select {
case readOnlyMetricCh <- val:
// 成功发送
default:
// 通道已满,丢弃
}
}
func recordMutating(val int) {
select {
case mutatingMetricCh <- val:
// 成功发送
default:
// 通道已满,丢弃
}
}
为什么现有实现没有使用通道模式
尽管代码违背了Go的设计哲学,但有几个可能的原因:
历史兼容性:可能是从早期版本演化而来,完全重构成本高
性能考虑:在高频调用的代码路径上,锁可能比通道有更低的开销
简单直接:对于简单的计数器场景,锁实现可能更直观
限流部分确实用了通道:
nonMutatingChan = make(chan bool, nonMutatingLimit) mutatingChan = make(chan bool, mutatingLimit)
这部分确实体现了Go的设计哲学,使用通道的缓冲区容量来限制并发请求数
总结
maxinflight.go中的代码部分遵循了Go的设计哲学(使用通道进行限流),但指标收集部分仍然采用了传统的"共享内存+锁"模式。一个更符合Go哲学的实现应该将指标收集也改为基于通道的模式,消除所有对共享内存的直接访问和修改。