【Kubernetes】API server 限流 之 maxinflight.go

发布于:2025-02-28 ⋅ 阅读:(17) ⋅ 点赞:(0)

这个文件实现了一个基于信号量(Channel)的简单限流器。

基础知识

总共有四种channel

  1. 带缓冲的channel
    nonMutatingChan、mutatingChan 都是带缓冲的channel ,这类channel 的特点是:
    这允许最多 mutatingLimit /nonMutatingLimit 个请求同时获取令牌并执行,直到缓冲区满了才会阻塞新的请求。

    对带缓冲 channel 的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收不需要阻塞等待)。

    但当缓冲区满了的情况下,对它进行发送操作的 Goroutine 就会阻塞挂起;当缓冲区为空的情况下,对它进行接收操作的 Goroutine 也会阻塞挂起。

    本代码中限流通道从创建到服务器关闭始终保持打开状态,因为这些通道用于控制并发请求数量的计数器,需要持续使用。

  2. 不带缓冲的channel

对无缓冲 channel 类型的发送与接收操作,一定要放在两个不同的 Goroutine 中进行,否则会导致 deadlock。

通道类型 创建方式 主要特点 典型使用场景
无缓冲通道 make(chan T) 同步、阻塞 信号通知、精确同步
带缓冲通道 make(chan T, size) 异步、缓冲 限流控制、任务队列
只读通道 <-chan T 只能接收 消费者模式、类型安全
只写通道 chan<- T 只能发送 生产者模式、类型安全
nil 通道 var ch chan T 阻塞操作 条件化通信、禁用分支

Kubernetes 的 maxinflight 限流器选择带缓冲通道作为实现方式,正是利用了其特有的信号量特性、非阻塞操作和容量控制能力。

当 select 语句中没有 default 分支,而且所有 case 中的 channel 操作都阻塞了的时候,整个 select 语句都将被阻塞,直到某一个 case 上的 channel 变成可发送,或者某个 case 上的 channel 变成可接收,select 语句才可以继续进行下去。

Kubernetes API Server maxinflight.go 限流分析

这个文件实现了一个基于信号量(Channel)的简单限流器,主要使用了两种限流通道:

1. 限流分类

var nonMutatingChan chan bool  // 非修改性请求的限流通道
var mutatingChan chan bool     // 修改性请求的限流通道

请求分类

  • 非修改性请求(Non-Mutating):get、list、watch 操作
  • 修改性请求(Mutating):create、update、patch、delete 等操作

2. 限流实现机制

核心实现使用了带缓冲的 Channel 作为信号量:

if nonMutatingLimit != 0 {
    nonMutatingChan = make(chan bool, nonMutatingLimit)  // 非修改性请求限流器
}
if mutatingLimit != 0 {
    mutatingChan = make(chan bool, mutatingLimit)        // 修改性请求限流器
}

限流处理流程

  1. 请求到达时
select {
case c <- true:  // 尝试获取令牌
    // 处理请求
    defer func() {
        <-c    // 释放令牌
    }()
    handler.ServeHTTP(w, r)
default:
    // 获取不到令牌时返回 429 Too Many Requests
    tooManyRequests(r, w, retryAfter)
}

这里的select 实现了快速失败策略,所有的请求,要么被处理,要么直接扔到default中。 实现了非阻塞并发
2. 特权用户处理

// 系统管理员(system:master)不受限流影响
if currUser, ok := apirequest.UserFrom(ctx); ok {
    for _, group := range currUser.GetGroups() {
        if group == user.SystemPrivilegedGroup {
            handler.ServeHTTP(w, r)
            return
        }
    }
}

3. 监控指标

使用 watermark 记录并发请求数:

type requestWatermark struct {
    phase                                string
    readOnlyObserver, mutatingObserver   fcmetrics.RatioedGauge
    readOnlyWatermark, mutatingWatermark int
    lock                                 sync.Mutex
}

指标更新

  • 定期更新 inflight 使用指标
  • 记录最大并发数
  • 分别统计读写请求

4. 特点总结

  1. 简单高效

    • 使用 Channel 实现,开销小
    • 无需复杂的算法
  2. 区分请求类型

    • 读写请求分开限流
    • 更细粒度的控制
  3. 特权用户豁免

    • 系统管理员不受限
    • 确保关键操作可执行
  4. 监控完善

    • 记录限流指标
    • 支持监控告警
  5. 无时间窗口

    • 不同于令牌桶等算法
    • 纯并发数控制

这是一个轻量级但有效的限流方案,适合 API Server 的需求。


Kubernetes API Server 中的两种限流通道分析

Kubernetes API Server 在 maxinflight.go 文件中实现了两种不同的限流通道,分别用于处理不同类型的请求:

1. 非修改性请求限流通道 (nonMutatingChan)

var nonMutatingChan chan bool
if nonMutatingLimit != 0 {
    nonMutatingChan = make(chan bool, nonMutatingLimit)
    klog.V(2).InfoS("Initialized nonMutatingChan", "len", nonMutatingLimit)
}
  • 处理的请求类型getlistwatch 等读取操作
  • 判断依据nonMutatingRequestVerbs.Has(requestInfo.Verb)
  • 目的:限制并发读取操作的数量

2. 修改性请求限流通道 (mutatingChan)

var mutatingChan chan bool
if mutatingLimit != 0 {
    mutatingChan = make(chan bool, mutatingLimit)
    klog.V(2).InfoS("Initialized mutatingChan", "len", mutatingLimit)
}
  • 处理的请求类型createupdatedeletepatch 等写入操作
  • 判断依据!nonMutatingRequestVerbs.Has(requestInfo.Verb)
  • 目的:限制并发写入操作的数量

主要区别

  1. 目标请求不同

    • nonMutatingChan 针对读操作
    • mutatingChan 针对写操作
  2. 资源消耗的差异

    • 写操作通常消耗更多资源,可能会修改集群状态
    • 读操作相对轻量,但数量可能更大
  3. 限流阈值不同

    • API Server 配置中可以为这两种通道设置不同的限流阈值
    • 通常写操作的限流阈值会低于读操作
  4. 监控指标分离

    • 代码中对两种请求的监控也是分开的:
      watermark.recordMutating(len(c))
      watermark.recordReadOnly(len(c))
      
  5. 限流效果的差异

    • 当读请求过多时,只会拒绝新的读请求,写请求不受影响
    • 当写请求过多时,只会拒绝新的写请求,读请求不受影响

实现原理

两种通道的底层实现方式相同,都是使用带缓冲的 channel 作为信号量:

select {
case c <- true:  // 尝试获取令牌
    // 处理请求并记录指标
    defer func() {
        <-c  // 释放令牌
    }()
    handler.ServeHTTP(w, r)
default:
    // 无法获取令牌,返回 429 Too Many Requests
    tooManyRequests(r, w, retryAfter)
}

这种区分读写请求的限流设计,使 Kubernetes API Server 能够在高负载情况下更有效地分配资源,确保系统的稳定性和可用性。


限流参数 nonMutatingLimit 、mutatingLimit 意义:

这段代码:

if nonMutatingLimit == 0 && mutatingLimit == 0 {
    return handler
}

意思解释

这是一个前置检查,用于判断是否需要启用限流功能:

  • 如果 nonMutatingLimitmutatingLimit 都为 0,表示不需要对任何类型的请求进行限流
  • 在这种情况下,函数直接返回原始的 handler,不添加任何限流逻辑
  • 相当于完全跳过限流处理,请求会直接传递给下一个处理器

为什么需要这个检查

  1. 性能优化

    • 如果不需要限流,避免创建不必要的通道和记录指标的开销
    • 减少请求处理的额外层级,提高性能
  2. 功能开关

    • 提供一种方式完全禁用限流功能
    • 管理员可以通过配置参数控制是否启用限流
  3. 兼容性

    • 允许那些不需要限流的环境(如小型开发集群)简化配置
    • 保持与低资源环境的兼容性

实际应用

在 Kubernetes API Server 配置中,可以通过这些参数控制限流:

apiServer:
  maxRequestsInflight: 400          # nonMutatingLimit 参数
  maxMutatingRequestsInflight: 200  # mutatingLimit 参数

如果将这两个值都设为 0,API Server 将不会对任何请求进行并发限制,可能适用于:

  • 开发/测试环境
  • 低负载集群
  • 有外部限流机制的环境

这提供了一个简单的开关,使管理员能够灵活控制是否启用 API Server 的内置限流功能。


基于"通过通信共享内存"原则分析maxinflight.go

从Go语言的设计哲学"不要通过共享内存来通信,而要通过通信来共享内存"(Don’t communicate by sharing memory; share memory by communicating)来看,maxinflight.go有几个违背此原则的地方:

违背原则的地方

1. 共享状态管理方式

// 使用互斥锁保护共享状态
type requestWatermark struct {
    phase                                string
    readOnlyObserver, mutatingObserver   fcmetrics.RatioedGauge
    lock                                 sync.Mutex  // 互斥锁
    readOnlyWatermark, mutatingWatermark int
}

// 全局共享变量
var watermark = &requestWatermark{
    phase: metrics.ExecutingPhase,
}

这里使用了传统的"共享内存+锁"的并发控制模式,而不是Go推荐的基于通道的模式。

2. 记录指标的方法

func (w *requestWatermark) recordMutating(mutatingVal int) {
    w.mutatingObserver.Set(float64(mutatingVal))

    w.lock.Lock()
    defer w.lock.Unlock()

    if w.mutatingWatermark < mutatingVal {
        w.mutatingWatermark = mutatingVal
    }
}

这里直接修改共享状态,而不是通过消息传递。

3. 周期性更新指标

go wait.Until(func() {
    watermark.lock.Lock()
    readOnlyWatermark := watermark.readOnlyWatermark
    mutatingWatermark := watermark.mutatingWatermark
    watermark.readOnlyWatermark = 0
    watermark.mutatingWatermark = 0
    watermark.lock.Unlock()

    metrics.UpdateInflightRequestMetrics(watermark.phase, readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod, stopCh)

使用锁直接访问和修改共享状态,而不是使用通道接收信息。

符合Go哲学的重构方案

根据"通过通信共享内存"的原则,可以这样重构:

// 定义度量指标更新消息
type MetricUpdate struct {
    ReadOnly int
    Mutating int
}

// 创建通道
var metricCh = make(chan MetricUpdate)
var readOnlyMetricCh = make(chan int)
var mutatingMetricCh = make(chan int)

// 启动指标收集器goroutine
func startMetricCollector(stopCh <-chan struct{}) {
    readOnlyMax := 0
    mutatingMax := 0
    
    // 定期更新指标
    ticker := time.NewTicker(inflightUsageMetricUpdatePeriod)
    defer ticker.Stop()
    
    for {
        select {
        case val := <-readOnlyMetricCh:
            if val > readOnlyMax {
                readOnlyMax = val
            }
        case val := <-mutatingMetricCh:
            if val > mutatingMax {
                mutatingMax = val
            }
        case <-ticker.C:
            // 更新并重置最大值
            metrics.UpdateInflightRequestMetrics(metrics.ExecutingPhase, readOnlyMax, mutatingMax)
            readOnlyMax = 0
            mutatingMax = 0
        case <-stopCh:
            return
        }
    }
}

// 记录指标的新函数
func recordReadOnly(val int) {
    select {
    case readOnlyMetricCh <- val:
        // 成功发送
    default:
        // 通道已满,丢弃
    }
}

func recordMutating(val int) {
    select {
    case mutatingMetricCh <- val:
        // 成功发送
    default:
        // 通道已满,丢弃  
    }
}

为什么现有实现没有使用通道模式

尽管代码违背了Go的设计哲学,但有几个可能的原因:

  1. 历史兼容性:可能是从早期版本演化而来,完全重构成本高

  2. 性能考虑:在高频调用的代码路径上,锁可能比通道有更低的开销

  3. 简单直接:对于简单的计数器场景,锁实现可能更直观

  4. 限流部分确实用了通道

    nonMutatingChan = make(chan bool, nonMutatingLimit)
    mutatingChan = make(chan bool, mutatingLimit)
    

    这部分确实体现了Go的设计哲学,使用通道的缓冲区容量来限制并发请求数

总结

maxinflight.go中的代码部分遵循了Go的设计哲学(使用通道进行限流),但指标收集部分仍然采用了传统的"共享内存+锁"模式。一个更符合Go哲学的实现应该将指标收集也改为基于通道的模式,消除所有对共享内存的直接访问和修改。