Github Action job 分配到集群
背景
job 是 Github Action 的基本单位,每个 job 单独分配一个 runner。workflow 由一个或者多个 job 组成。如果用户触发runs-on
字段为arc-runner-set
的 job,那么 Github Action 服务器将 job 分配给 listener pod。
源码
handleMessage
函数主要处理2类处理服务器消息。第一类是状态为started
的 job: job 已经由服务器分配给 runner 执行。HandleJobStarted
函数 job 信息局部更新给EphemeralRunner
资源。
func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *actions.RunnerScaleSetMessage) error {
parsedMsg, err := l.parseMessage(ctx, msg) // 解析消息
l.lastMessageID = msg.MessageId
if err := l.deleteLastMessage(ctx); err != nil { // 请求服务器删去消息
return fmt.Errorf("failed to delete message: %w", err)
}
for _, jobStarted := range parsedMsg.jobsStarted {
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
return fmt.Errorf("failed to handle job started: %w", err)
}
l.metrics.PublishJobStarted(jobStarted)
}
desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs, len(parsedMsg.jobsCompleted))
l.metrics.PublishDesiredRunners(desiredRunners)
return nil
}
第二类是状态为Assigned
和Completed
的 job。前者是还未结束的任务,后者是已经结束的任务。
HandleDesiredRunnerCount
函数首先调用setDesiredWorkerState
函数计算集群的 runner 数量。
// count = parsedMsg.statistics.TotalAssignedJobs 表示未结束的 job 数量
// jobsCompleted = len(parsedMsg.jobsCompleted) 表示已经运行结束的 job 数量
func (w *Worker) setDesiredWorkerState(count, jobsCompleted int) int {
// 根据用户在 runner scale set chart 的 values.yaml 文件配置的 minRunenrs 和 maxRunners 以及分配的 job 数量综合确定集群的 runner 数量。
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)
w.patchSeq++ // 批次序号+1
desiredPatchID := w.patchSeq
if count == 0 && jobsCompleted == 0 { // 本批次既没有运行的 job,也没有结束的 job
targetRunnerCount = max(w.lastPatch, targetRunnerCount)
if targetRunnerCount == w.config.MinRunners {
// 运行至此,本批次没有活跃 job,上批次也没有活跃 job。集群处于空闲状态
desiredPatchID = 0 // 将 desiredPatchID 设为 0 用于触发缩容
}
}
w.lastPatch = targetRunnerCount
return desiredPatchID
}
之后HandleDesiredRunnerCount
函数将批次序号和 runner 数局部更新给EphemeralRunnerSet
资源。
patch, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: w.lastPatch, // targetRunnerCount
PatchID: patchID, // desiredPatchID
},
},
)
EphemeralRunnerSet
控制器根据批次序号和 runner 数更改 runner 资源。主要逻辑如下:
// total 是集群中运行的 runner pod 的数量,与 Github Action 服务器无关
total := ephemeralRunnerState.scaleTotal()
if ephemeralRunnerSet.Spec.PatchID == 0 || ephemeralRunnerSet.Spec.PatchID != ephemeralRunnerState.latestPatchID {
defer func() {
// 主动删除已经结束的`EphemeralRunner`资源
if err := r.cleanupFinishedEphemeralRunners(ctx, ephemeralRunnerState.finished, log); err != nil {
log.Error(err, "failed to cleanup finished ephemeral runners")
}
}()
log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas)
switch {
// 集群中 runner 数量小于 job 数量,扩容
case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up
count := ephemeralRunnerSet.Spec.Replicas - total
log.Info("Creating new ephemeral runners (scale up)", "count", count)
if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil {
log.Error(err, "failed to make ephemeral runner")
return ctrl.Result{}, err
}
// 集群中 runner 数量大于 job 数量,说明 Github Action 服务器已经认为某些 job 结束,但是 job 对应的 runner pod 还未到结束状态。等待 runner pod 自行结束。
case ephemeralRunnerSet.Spec.PatchID > 0 && total >= ephemeralRunnerSet.Spec.Replicas:
// PatchID == 0 是 setDesiredWorkerState 函数的缩容标记,避免集群空闲时仍然有 runner pod 在运行
case ephemeralRunnerSet.Spec.PatchID == 0 && total > ephemeralRunnerSet.Spec.Replicas:
count := total - ephemeralRunnerSet.Spec.Replicas
if err := r.deleteIdleEphemeralRunners(
ctx,
ephemeralRunnerSet,
ephemeralRunnerState.pending,
ephemeralRunnerState.running,
count,
log,
); err != nil {
log.Error(err, "failed to delete idle runners")
return ctrl.Result{}, err
}
}
}
createEphemeralRunners
函数创建EphemeralRunner
资源。资源结构如下:
return &v1alpha1.EphemeralRunner{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
GenerateName: ephemeralRunnerSet.Name + "-runner-", // k8s 自动为EphemeralRunner 资源名称创建随机后缀,因为一个 EphemeralRunnerSet 对应多个 EphemeralRunner 资源
Namespace: ephemeralRunnerSet.Namespace,
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: ephemeralRunnerSet.GetObjectKind().GroupVersionKind().GroupVersion().String(),
Kind: ephemeralRunnerSet.GetObjectKind().GroupVersionKind().Kind,
UID: ephemeralRunnerSet.GetUID(),
Name: ephemeralRunnerSet.GetName(),
Controller: boolPtr(true),
BlockOwnerDeletion: boolPtr(true),
},
},
},
Spec: ephemeralRunnerSet.Spec.EphemeralRunnerSpec,
}
EphemeralRunner
控制器根据EphemeralRunner
资源创建 runner pod。步骤如下:
- 添加 finalizer 字段。
finalizer
表示在集群中注销本资源。runner-registration-finalizer
表示在 Github Action 服务器注销本资源。
const (
ephemeralRunnerFinalizerName = "ephemeralrunner.actions.github.com/finalizer"
ephemeralRunnerActionsFinalizerName = "ephemeralrunner.actions.github.com/runner-registration-finalizer"
)
- 向 Github Action 服务器注册自身。返回 RunnerId 表示注册成功。
type RunnerScaleSetJitRunnerSetting struct {
Name string `json:"name"`
WorkFolder string `json:"workFolder"`
}
// jit(Just-in-Time) config:服务器返回的注册信息
// jitSettings 的类型是 RunnerScaleSetJitRunnerSetting。包含 runner 名称以及工作目录。
// RunnerScaleSetId: runner 所属 runner scale set 的注册Id
jitConfig, err := actionsClient.GenerateJitRunnerConfig(ctx, jitSettings, ephemeralRunner.Spec.RunnerScaleSetId)
// 更新 EphemeralRunner 资源
err = patchSubResource(ctx, r.Status(), ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) {
obj.Status.RunnerId = jitConfig.Runner.Id // runner id
obj.Status.RunnerName = jitConfig.Runner.Name // runner 名称
obj.Status.RunnerJITConfig = jitConfig.EncodedJITConfig // 配置 secret
})
- 将 RunnerJITConfig 注册为 secret, runner pod 将通过 config 与 Github Action 服务器连接。
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: ephemeralRunner.Name,
Namespace: ephemeralRunner.Namespace,
},
Data: map[string][]byte{
jitTokenKey: []byte(ephemeralRunner.Status.RunnerJITConfig),
},
}
- 创建 runner pod。runner pod 的镜像是
ghcr.io/actions/actions-runner
。它将与 Github Action 服务器建立连接。服务器分配 job 给 runner pod,服务器将 job 的任务内容交给 runner pod 执行。执行完成后,runner pod 结束。
c.Env = append(
c.Env,
corev1.EnvVar{
Name: EnvVarRunnerJITConfig,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: secret.Name,
},
Key: jitTokenKey, // 将 RunnerJITConfig secret 挂载给 runner container。
},
},
},
)
- 监控 runner container 的状态。
总结
本文讲了 Github Action job 到 runner pod 的执行流程。