Pod调度
Pod调度: 通过污点、容忍度和亲和性影响Pod的调度
- 调度器实现, 其基于配置器构造(其配置来源于配置API)
- 调度过程中任何插件返回拒绝, 都会导致Pod可能再次返回调度队列
如: Pod调度简略流程
调度执行流程:
- 通过
SharedIndedInformer
过滤未调度的Pod放入调度队列 - 通过
SharedIndexInformer
过滤已调度的Pod更新调度缓存 - 从调度队列中取出个Pod,通过其
SchedulerName
执行特定调度框架 - 调度框架触发调度算法利用调度缓存为Pod选择最优的Node进行异步绑定
实现原理
调度器(Scheduler): Pod调度决策
- 调度过程的数据依赖于瞬间的缓存
- Pod调度完成后需等待调度插件的批准才可执行绑定
- 基于模块: 调度队列、调度缓存、调度框架、调度算法
以下源码分析均都基于v1.28.1
版本的Kubernetes源码
// https://github1s.com/kubernetes/kubernetes/blob/HEAD/pkg/scheduler/scheduler.go#64
// Scheduler 监视未调度的Pod并尝试找到适合的Node, 并将绑定信息写回到API Server
type Scheduler struct {
// Cache 调度缓存, 缓存所有的Node状态
// 每次调度前都需更新快照, 以便调度算法使用
Cache internalcache.Cache
// Extenders 调度插件
Extenders []framework.Extender
// NextPod 获取下个要调度的Pod, 没有则阻塞goroutine
// 不能直接从调度队列中获取调度的Pod, 其不能日志记录调度Pod
NextPod func() (*framework.QueuedPodInfo, error)
// FailureHandler 调度时的回调错误函数
FailureHandler FailureHandlerFn
// SchedulePod 调度算法给出Pod可调度的Node
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// StopEverything 关闭Scheduler的信号
StopEverything <-chan struct{}
// SchedulingQueue 缓存等待调度的Pod
SchedulingQueue internalqueue.SchedulingQueue
// Profiles 调度框架的配置(Profile和Frameword属于1:1)
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
logger klog.Logger
registeredHandlers []cache.ResourceEventHandlerRegistration
}
// New 调度器构造函数
func New(ctx context.Context,
client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
opts ...Option) (*Scheduler, error) {
// Kubernetes内部记录器, 并获取终止信号
logger := klog.FromContext(ctx)
stopEverything := ctx.Done()
// 在默认的schedulerOptions基础上应用所有的opts
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
// 是否应用默认调度框架配置
if options.applyDefaultProfile {
var versionedCfg configv1.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
// 创建InTree插件Factory注册表, 并与OutTree插件Factory注册表合并以形成插件Factory注册表
// 数据类型为(插件Factory就是插件的构造函数): map[插件名称]插件Factory
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
// 注册调度器的度量标准
metrics.Register()
// 调度器的扩展程序
extenders, err := buildExtenders(logger, options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
// 获取Pod和Node数据
// 以便做成快照数据, 提供调度依据
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
// 初始化快照数据和度量指标记录器(异步)
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
// 初始化调度器的配置文件
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
// 配置中添加PreEnqueue和Queueing Hint
// PreEnqueue用于在Pod加入调度队列的前置操作
// Queueing Hint过滤事件, 防止调度Pod时的无用重试
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
}
// 创建调度队列
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
// 假定调度指向该调度器的调度队列
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
// 调度缓存
// durationToExpireAssumedPod 代表绑定的TTL
// 若该时间内未完成绑定, 则从调度缓存中移除该假定调度Pod
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
// 调度器的Debugger
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(ctx)
// 创建调度器, 并从调度队列中弹出个Pod开始调度
// 并指定调度器默认的调度流程和调度失败的回调函数
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
}
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()
// 注册事件处理函数
// 新建Pod先放入调度队列, 绑定成功后由该函数更新调度缓存以确认
// 本质: 通过SharedIndexInformer监控Pod和NService等调度依赖的资源, 并根据事件执行对应操作
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}
return sched, nil
}
调度器执行流程如下(主要通过scheduleOne()
方法实现):
- 调度器从调度队列中获取个Pod
- 先通过
NextPod()
方法从调度队列中获取Pod(可记录日志) - 调度器会循环从调度队列中获取Pod, 没有待调度的Pod就直接返回
- 先通过
- 对获取的Pod做调度前的预处理
- 获取Pod指定的调度框架(
spec.SchedulerName
) - 配置Pod调度的环境(计时、
CycleState
和schedulingCycleCtx
等)
- 获取Pod指定的调度框架(
- 判断是否忽略Pod的调度
- Pod已被删除, 则直接忽略
- Pod被更新, 但已调度/假定调度(根据Pod的更新决定是否重新调度)
- 通过
Score
匹配最优NodeScore
会基于多种因素计算出最适合Pod的Node- 若没有Node能满足Pod的资源需求, 则Pod通过
PostFilter
进行抢占式调度 - 若Pod是抢占式调度的, 则Pod当前依然是不可调度的(需等待被抢占的Pod优雅退出)
- 记录所有调度失败的原因(
FailureHandler()
)- 记录导致Pod调度失败的事件(
kubectl describe pod
命令时的信息) - 从
SharedIndexInformer
缓存中获取Pod最新状态, 决定是否将调度失败的Pod再次放回调度队列
- 记录导致Pod调度失败的事件(
- 假定调度Pod(调度器无需等待可立刻调度下个Pod)
- 调度缓存中更新Pod已绑定匹配的Node
- 若TTL内未绑定成功, 则判定假定调度失败并从调度缓存中删除绑定
- 为Pod预留全局资源
- 若预留资源失败, 则删除已预留的资源和调度缓存中假定调度信息并记录失败信息
- 判定Pod是否可进入绑定周期
- 需等待所有插件批准才可执行绑定
- 若未批准会执行: 删除预留资源、删除假定调度Pod、记录失败信息
- Pod绑定Node, 异步执行
- 绑定预处理(按顺序执行调度框架的各个插件)
- 执行绑定, 向API Server写入信息(先
Extender
后Bind
, 因部分资源只有前者可管理) - 若绑定成功, 则通知调度缓存并记录绑定成功事件(绑定失败也会记录事件)
- 若绑定失败会执行: 删除预留资源、删除假定调度Pod、记录失败信息
如: 调度整体流程
调度队列
调度队列(SchedulingQueue): Pod调度过程中的Pod获取顺序
- 调度队列具有幂等性(每次操作前均判断是否已存在)
- 调度队列的实现是个优先队列(由传入的函数决定其优先级)
优先队列
优先队列(PriorityQueue): 基于map以优先级方式实现调度队列
- 优先队列由三个子队列构成: 就绪队列、不可调度队列、退避队列
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L150
// 实现SchedulingQueue接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L90
// PriorityQueue 优先级方式实现的调度队列
//
// activeQ(就绪队列): 存储等待被调度的Pod(从该队列中Pop出的Pod), 默认存储新添加的Pod
// backoffQ(退避队列): 存储等待特定时间后可调度的Pod(再次放入activeQ), 等待时间根据尝试次数进行指数级增长(默认上限10s)
// unschedulablePods(不可调度队列): 存储由各种原因导致无法调度的Pod, 经过特定周期后会再次加入activeQ(默认60s)
type PriorityQueue struct {
*nominator
stop chan struct{}
clock clock.Clock
// podInitialBackoffDuration Pod的初始退避时间, 默认1s
// Pod后续每次调度失败, 该时间就以二次方增加
podInitialBackoffDuration time.Duration
// podMaxBackoffDuration Pod的最大退避时间, 默认10s
podMaxBackoffDuration time.Duration
// podMaxInUnschedulablePodsDuration Pod可处于unschedulablePods的最大时间
podMaxInUnschedulablePodsDuration time.Duration
cond sync.Cond
// inFlightPods 返回所有当前正在处理的Pod
inFlightPods map[types.UID]inFlightPod
// receivedEvents 返回调度队列收到的所有事件
receivedEvents *list.List
// activeQ 存储待调度的Pod
// 头部Pod是具有最高优先级的Pod(最先调度)
activeQ *heap.Heap
// podBackoffQ 按照退避时间到期排序
// 完成退避的Pod将在调度器查看activeQ之前从其中弹出
podBackoffQ *heap.Heap
// unschedulablePods 返回已尝试并确定无法调度的Pod
unschedulablePods *UnschedulablePods
// schedulingCycle 返回调度周期
schedulingCycle int64
// moveRequestCycle 缓存移动请求时的调度周期
// 当接受到移动请求并正在调度不可调度Pod时, 则将其放回activeQ
moveRequestCycle int64
// preEnqueuePluginMap PreEnqueue插件的配置(K为配置文件名)
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// queueingHintMap Queueing Hint插件的配置(K为配置文件名)
queueingHintMap QueueingHintMapPerProfile
// closed 队列是否关闭
closed bool
nsLister listersv1.NamespaceLister
metricsRecorder metrics.MetricAsyncRecorder
pluginMetricsSamplePercent int
isSchedulingQueueHintEnabled bool
}
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/queue/scheduling_queue.go#L1261
// UnschedulablePods 不可调度Pod的队列, 对Map的再次封装
type UnschedulablePods struct {
// podInfoMap 存储Pod的map, K为Pod的名称
podInfoMap map[string]*framework.QueuedPodInfo
// keyFunc 获取对象K的函数
keyFunc func(*v1.Pod) string
// unschedulableRecorder 监控数据
unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}
如: 优先队列的流程
底层数据
底层数据: 封装用于调度队列存储对象的底层数据结构
- Heap: 对map的再次封装, 具有slice的顺序性和map的高效检索能力
- QueuedPodInfo: 对Pod的再次封装, 具有调度队列存储相关的信息
// https://github1s.com/kubernetes/kubernetes/blob/HEAD/pkg/scheduler/framework/types.go#167
// QueuedPodInfo 在Pod基础上封装关于调度队列的信息
type QueuedPodInfo struct {
*PodInfo
// Timestamp Pod添加到调度队列的时间
// Pod可能会频繁从取出再放入, 该时间便于处理Pod
Timestamp time.Time
// Attempts Pod重试调度的次数
Attempts int
// InitialAttemptTimestamp Pod首次添加到调度队列的时间
// 初始化后不再更新, 用于计算调度完成所需时间
InitialAttemptTimestamp *time.Time
// UnschedulablePlugins Pod调度周期中导致失败的插件名称
// 仅对PreFilter, Filter, Reserve, Permit(WaitOnPermit)插件有效
UnschedulablePlugins sets.Set[string]
// Gated 是否由 PreEnqueuePlugin 调度
Gated bool
}
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/heap/heap.go#L127
// Heap 实现堆数据结构的生产者/消费者队列, 可用于优先级队列类似的数据结构
type Heap struct {
// data 存储数据对象
data *data
// metricRecorder 监控数据
metricRecorder metrics.MetricRecorder
}
// data 实现标准库的 Heap 接口
type data struct {
// items 通过map管理所有对象
items map[string]*heapItem
// queue 通过slice管理所有对象的K(namespace + pod name)
queue []string
// keyFunc 获取对象K的函数
// 用于操作queue, 应保证该函数的确定性
keyFunc KeyFunc
// lessFunc 比较两个对象的函数(用于排序)
lessFunc lessFunc
}
调度缓存
调度缓存(SchedulerCache): 获取Etcd中Pod和Node的绑定等调度相关所需的信息
- Node信息中已包含所有运行在该Node上的Pod信息
- 调度缓存会维护段时间已删除的Node, 直到Node没有Pod
- 调度缓存中的Node有虚实之分, 虚Node实现Node增加/删除时的正常调度(
nodeTree
仅存储实Node)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/cache.go#L57
// 实现Cache接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/interface.go#L60
// cacheImpl 实现调度缓存接口
type cacheImpl struct {
// top 调度缓存的停止Chan
stop <-chan struct{}
// ttl 假定调度绑定的超时时间, 默认30s
ttl time.Duration
// period 定期清除假定调度绑定超时的Pod, 默认60s
period time.Duration
// mu 读写锁保证并发安全
mu sync.RWMutex
// assumedPods 假定调度Pod集合
assumedPods sets.Set[string]
// podStates 所有Pod信息
podStates map[string]*podState
// nodes 所有Node信息
nodes map[string]*nodeInfoListItem
// headNode Node双向链表中首个Node
// 基于特定规则排序, 链表的排序效率高于slice
headNode *nodeInfoListItem
// nodeTree 节点按照zone组成成的树状数据结构
nodeTree *nodeTree
// imagesStates 镜像状态
imageStates map[string]*imageState
}
快照(Snapshot): 调度缓存某瞬间下的副本
- 作用:通过增量更新和只读, 避免频繁获取和读写锁损失性能
- 调度器在执行每个调度周期前, 都会获取个快照作为调度的数据依据
- 每次调度都会根据调度缓存更新快照中的Node信息以保证状态一致(仅更新部分信息)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/internal/cache/snapshot.go#L29
// Snapshot 缓存 NodeInfo 和 NodeTree 快照
type Snapshot struct {
// nodeInfoMap Node信息(map[namespace + name]NodeInfo)
nodeInfoMap map[string]*framework.NodeInfo
// nodeInfoList 按照NodeTree排序的Node全集列表(不包含已删除的Node)
nodeInfoList []*framework.NodeInfo
// havePodsWithAffinityNodeInfoList 处理具有亲和性的Pod
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
// havePodsWithRequiredAntiAffinityNodeInfoList 处理具有反亲和性的Pod
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
// usedPVCSet 调度Pod使用的PVC
usedPVCSet sets.Set[string]
// generation Node的配置纪元
// 所有NodeInfo.Generation中的最大值(其均源于全局Generation变量)
generation int64
}
假定调度Pod(Assume): 调度结果写入Etcd
- 异步绑定假定结果, 调度器继续调度其他Pod以保证性能
- 假定调度绑定时会预先占用资源防止再次分配, 但真正绑定失败会释放占用资源
- 假定调度Pod具有绑定限定时间, 超时未真正绑定会释放占用资源和清除假定调度Pod
- 当真正绑定时会删除假定调度Pod, 并对假定调度Pod占用资源进行转移
如: 调度缓存流程
调度框架
调度框架: Kubernetes调度器的插件架构(调度插件的集合)
- 扩展点: 调度插件注册后执行位置(提供信息或调度决策)
- Pod的调度流程分为两个周期: 调度周期(串行)、绑定周期(并行)
- 句柄(Handler): 为插件提供服务(提供额外功能, 协助插件完成功能)
- 配置后的调度框架就是个调度器, 配置后的调度插件就是个调度算法
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/runtime/framework.go#L49
// 实现Framework接口: https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/interface.go#L512
// frameworkImpl
type frameworkImpl struct {
// registry 调度插件注册表, 通过其创建配置的插件
registry Registry
// snapshotSharedLister 基于快照的Lister
snapshotSharedLister framework.SharedLister
// waitingPods 存储等待批准的Pod
waitingPods *waitingPodsMap
// scorePluginWeight 插件的权重映射(K为插件名称)
scorePluginWeight map[string]int
// 所有扩展点的插件, 用于实现Framework接口的各个方法
// 每个扩展点都会遍历执行插件, 且均在构造函数中通过SchedulingProfile生成
preEnqueuePlugins []framework.PreEnqueuePlugin
enqueueExtensions []framework.EnqueueExtensions
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
postFilterPlugins []framework.PostFilterPlugin
preScorePlugins []framework.PreScorePlugin
scorePlugins []framework.ScorePlugin
reservePlugins []framework.ReservePlugin
preBindPlugins []framework.PreBindPlugin
bindPlugins []framework.BindPlugin
postBindPlugins []framework.PostBindPlugin
permitPlugins []framework.PermitPlugin
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
logger klog.Logger
metricsRecorder *metrics.MetricAsyncRecorder
profileName string
percentageOfNodesToScore *int32
extenders []framework.Extender
framework.PodNominator
parallelizer parallelize.Parallelizer
}
如: 调度周期和绑定周期执行流程(调度上下文)
- Kubernetes集群中可存在多个调度框架
- 扩展点是插件的设计接口, 调度需12个插件
- 带有Pre前缀的插件都是提供信息的, 其他均是做决策的
- 调度框架就是个调度器, 配置好的调度插件就是调度算法
调度插件: 影响Pod调度的各组件
- 调度插件分为多种, 而每个调度插件可有多种实现
- 调度插件都虚静态编译到注册中, 且通过唯一性的名称区分
- 插件均是无状态的(插件存储状态需依赖外部实现), 且插件间通信依赖于
CycleState
- 每种插件类型可实现多种类型的插件接口(该插件可在插件框架中的多个扩展位置作用)
// https://github1s.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/cycle_state.go#L48
// CycleState 基于共享变量实现插件之间数据传输
// 仅作为单词调度周期中, 各个插件之间通信(调度上下文)
//
// 未提供任何数据保护, 对所有插件都认为是可信的
type CycleState struct {
// storage 存储数据
storage sync.Map
// recordPluginMetrics 是否监控
recordPluginMetrics bool
// SkipFilterPlugins 将在Filter扩展点忽略的插件
SkipFilterPlugins sets.Set[string]
// SkipScorePlugins 将在Score扩展点忽略的插件
SkipScorePlugins sets.Set[string]
}
调度框架中各调度插件的说明:
插件名称 | 说明 |
---|---|
Sort | 排序等待调度的Pod (默认按照优先级) |
PreFilter | 处理Pod相关信息为过滤Node做准备 (过滤前的处理) |
Filter | 过滤无法运行该Pod的Node (对多节点并发应用多个Filter插件) |
PostFilter | 抢占调度 (仅在Filter过滤不出Node时执行) |
PreScore | 处理Pod相关信息为Node评分做准备 (主要处理亲和性、拓扑分步和容忍度) |
Score | 对所有过滤的Node评分并排序 (首个Node则为Pod的最优选择) |
NormalizeScore | 修改已排序的Node评分 (提高Node评分的扩展性) |
Reserve | 维护全局调度状态 (防止下次调度与本次绑定完成前发生竞争) |
Premit | 标注Pod状态以防止或延迟Pod绑定 (Pod状态可为: 批准、等待、延迟) |
Prebind | 处理Pod绑定需完成的操作 (常用于完成PV和PVC) |
Bind | Pod与Node绑定 (仅作用单个Bind插件) |
PostBind | 清理绑定期间使用的资源 (没有默认实现, 用于自定义扩展) |
- Pre前缀的插件是预处理并提供信息, 同时避免部分真正重量操作重复执行