K8S数据流核心底层逻辑剖析

发布于:2025-07-11 ⋅ 阅读:(21) ⋅ 点赞:(0)

一、背景

        之前也在学习使用K8S,但是仅仅停留在Pod控制器的部署使用、Service、Ingress、Pod等等层面,底层的数据流逻辑没去细究。 最近花了点时间去详细剖析了一下,和大家做个分享。

        我查询过很多资料,B站、CSDN各种资料,我发现几乎没人能把底层的逻辑讲清楚,或者说K8S的整个数据流架构思想讲清楚。 真的这个思想不复杂,但是确实没人讲得很透

        一方面可能入门门槛较高,大家觉得会用都已经很不错了,细究原理比较少。还有的就是,可能大家也不想讲,要么收费课程啥的,讲得含糊其辞,怕别人理解了似的😂。

        所以我使用最通俗易懂的方式来讲解k8s数据流的核心底层逻辑,可以有利于我们深入学习k8s,也可以帮我们去理解如何做CRD二次开发,开发自定义的CRD、CRD控制器等等。

二、架构图

核心逻辑总结:

        1、提交期望的部署资源对象清单数据,到kube-apiserver, kube-apiserver校验,存储清单数据到etcd数据库

        2、资源对象controller程序,会持续订阅watch监听kube-apiserver关于该类型的资源对象event事件,例如新增、更新、删除等等。  得到对应类型event,执行对应逻辑,例如创建Pod、更新Pod、删除Pod等等。  会按照ETCD存储的清单目标,通过自己的逻辑无限逼近这个目标的达成,如果能够达成最好,达不成也会例如重试等机制,往目标状态贴近,直到最后目标状态一致

        3、K8S无外乎就是, 官方定义了很多内置/通用性强的资源对象类型,并且这些资源对象类型都有对应的controller控制器的代码实现.  如果内置对象不满足你的需求,同时也支持你扩展,定义自定义CRD、CRD控制器完成自定义的资源对象逻辑

整体逻辑就是如此,万变不离其宗。  无外乎我们都在学习Deployment、Pod、Service等等的字段,功能等等。 

三、CRD、CRD控制器

0、需求

        1、需求是创建一个名称叫DoubleDeployment的CRD

        2、DoubleDeployment存在2个字段:  replicas副本数、image镜像URL地址

        3、底层引用内置Deployment, 创建一个Deployment.  设置mainc存在一个容器,镜像地址是image,  并且副本数replicas = DoubleDeployment.replicas  x 2     也就是说的Double的含义

        4、也就是你一旦创建这个DoubleDeployment 资源对象,填写的replicas数量, 都会创建一个关联的Deployment, 并且这个Deployment的副本数 是 DoubleDeployment.replicas  x2  翻倍

1、CRD

        CRD类比,Java编程当中的class定义.  K8S内置的Deployment、DaemonSet、Pod等等资源对象,本质就是一种class类型.   既然类比class, 那么只是定一个这个数据类型或者资源对象的元数据,数据结构。 

        例如存在哪些字段,这些字段哪些是必须的、哪些非必须的,字段类型,字段的规则等等。仅仅只是定义了资源对象的数据结构

        下面就是一个简单的CRD清单:  DoubleDeployment-CRD.yml

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: doubledeployments.samplecontroller.demo.io
spec:
  group: "samplecontroller.demo.io"
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                replicas:
                  type: integer
                  minimum: 0
                  maximum: 100
                image:
                  type: string
              required:
                - replicas
                - image
  scope: Namespaced
  names:
    plural: doubledeployments
    singular: doubledeployment
    kind: DoubleDeployment
    shortNames:
      - dd

2、CRD实例/CRD资源清单

        光有class定义没啥作用,我们的目的是new生成这个对象实例,用于后续的使用。 CRD实例就是这么来的。  例如我们定义一个Deployment的yaml对象资源清单,本质就是类比new了这个Deployment类型的实例。

        存在CRD的资源清单/CRD实例: DoubleDeployment.yml

apiVersion: samplecontroller.demo.io/v1
kind: DoubleDeployment
metadata:
  name: test-dd
  namespace: default
spec:
  replicas: 2         #副本数2个, 但是DoubleDeployment CRD底层控制器会 x2 最终等于4个 pod
  image: nginx:alpine

3、CRD控制器

      存在了CRD,也存在了CRD实例,还是不够的,这些会被提交到ETCD数据库进行存储. 还需要CRD控制器对这个CRD类型数据进行watch监听,执行实际操作,才能达到我们的目的。使用Go编写了一个简单的CRD controller程序:

        main.go:

package main

import (
	"context"
	"fmt"
	"os"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	_ "k8s.io/client-go/tools/clientcmd"
	"k8s.io/klog/v2"

	ctrl "sigs.k8s.io/controller-runtime"
)

// DoubleDeployment CRD 结构体
type DoubleDeployment struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec struct {
		Replicas int32  `json:"replicas"`
		Image    string `json:"image"`
	} `json:"spec"`
}

var (
	gvr = schema.GroupVersionResource{
		Group:    "samplecontroller.demo.io",
		Version:  "v1",
		Resource: "doubledeployments",
	}
)

func main() {
	klog.SetOutput(os.Stdout)
	klog.InitFlags(nil)
	ctx := context.Background()

	// 获取 kubeconfig(本地或集群内)
	config := ctrl.GetConfigOrDie()

	// 创建 dynamic client
	dynClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	// 启动控制器主循环
	for {
		// 列出所有 DoubleDeployment 资源
		list, err := dynClient.Resource(gvr).Namespace("default").List(ctx, metav1.ListOptions{})
		if err != nil {
			klog.ErrorS(err, "Failed to list DoubleDeployment")
			continue
		}

		for _, item := range list.Items {
			var dd DoubleDeployment
			if err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &dd); err != nil {
				klog.ErrorS(err, "Failed to convert unstructured", "name", item.GetName())
				continue
			}

			// 只处理未处理过的资源(比如没有标签标记)
			if item.GetLabels() == nil || item.GetLabels()["processed"] != "true" {
				if err := reconcileDeployment(ctx, config, &dd); err != nil {
					klog.ErrorS(err, "Failed to reconcile deployment", "name", dd.Name)
				} else {
					// 标记为已处理,避免重复创建
					item.SetLabels(map[string]string{"processed": "true"})
					_, uErr := dynClient.Resource(gvr).Namespace(dd.Namespace).Update(ctx, &item, metav1.UpdateOptions{})
					if uErr != nil {
						klog.ErrorS(uErr, "Failed to update CRD label", "name", dd.Name)
					}
				}
			}
		}

		select {
		case <-ctx.Done():
			return
		default:
			continue
		}
	}
}

// reconcileDeployment:根据 CR 创建 Deployment
func reconcileDeployment(ctx context.Context, config *rest.Config, dd *DoubleDeployment) error {
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		return err
	}

	namespace := dd.Namespace
	if namespace == "" {
		namespace = "default"
	}

	deployName := dd.Name
	// 副本数 * 2 创建这个deployment
	replicas := int32(dd.Spec.Replicas * 2)
	image := dd.Spec.Image

	deployment := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      deployName,
			Namespace: namespace,
		},
		Spec: appsv1.DeploymentSpec{
			Replicas: &replicas,
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{"app": deployName},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: map[string]string{"app": deployName},
				},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "mainc",
							Image: image,
						},
					},
				},
			},
		},
	}

	// 创建 Deployment
	_, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{})
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error creating deployment: %v\n", err)
		return err
	}
	fmt.Printf("✅ Created deployment: %s with replicas: %d\n", deployName, replicas)
	return nil
}

案例go代码,各位看官自取: 通过网盘分享的文件:crd-demo.zip 链接: https://pan.baidu.com/s/180C5MeTm-6ede_PFa9VdeQ?pwd=f42d 提取码: f42d

四、分享视频

        这里大家通过观看我的视频讲解,可能更加细致:

B站最清晰讲解,k8s(kubernetes)数据流核心底层逻辑-通俗易懂

五、总结

        通过本篇博文,你会恍然大悟,原来K8S的架构、数据流的底层原理也不过如此. 整体的玩法就是这么玩的, 不再云里雾里,不再觉得这玩意是庞然大物了。

        无论是运维还是开发,我们可以把K8S做为白盒去看待,遇到问题,也能知道怎么定位、怎么排查、怎么去开发CRD。

        希望能给大家一些思路上的一些启发~


网站公告

今日签到

点亮在社区的每一天
去签到