kube-apiserver源码分析

发布于:2022-12-20 ⋅ 阅读:(714) ⋅ 点赞:(0)

本文主要看一下apiserver的启动及三种server安装路由的流程。

kube-apiserver进程启动入口

//cmd/kube-apiserver/apiserver.go
import (
	"k8s.io/kubernetes/cmd/kube-apiserver/app"
	...
)

func main() {
   rand.Seed(time.Now().UnixNano())

   pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)

   command := app.NewAPIServerCommand()

   logs.InitLogs()
   defer logs.FlushLogs()

   if err := command.Execute(); err != nil {
      os.Exit(1)
   }
}

使用cobra框架,最终调用Run

//cmd/kube-apiserver/app/server.go
//通过包导入的方式注册apiserver资源
import (
	//创建extensions全局注册表,并注册extensions资源到extensions全局注册表
	extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
	//创建aggregator全局注册表,并注册aggregator资源到aggregator全局注册表
	aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
	//创建kubeapiserver全局注册表
	"k8s.io/kubernetes/pkg/api/legacyscheme"
	//注册kubeapiserver资源到kubeapiserver全局注册表
	"k8s.io/kubernetes/pkg/controlplane"
	...// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
	cmd := &cobra.Command{
		RunE: func(cmd *cobra.Command, args []string) error {
			completedOptions, err := Complete(s)
			return Run(completedOptions, genericapiserver.SetupSignalHandler())
		}
	}
}

Run主流程,短短几行,但其内部的实现流程还是比较长的,接下来慢慢看

// Run runs the specified APIServer.  This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
   // To help debugging, immediately log version
   klog.Infof("Version: %+v", version.Get())

   //创建server链。kube-apiserver进程会启动三种server,以链的形式对外提供http服务,后面会详细讲解
   server, err := CreateServerChain(completeOptions, stopCh)

   //启动server前的准备工作
   prepared, err := server.PrepareRun()

   //最后启动http server,开始监听接收客户端请求
   return prepared.Run(stopCh)
}

CreateServerChain

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    //根据命令行选项创建通用配置
	kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)

    //根据通用配置生成extensionserver需要的配置
	// If additional API servers are added, they should be gated.
	apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))

    //根据extensionserver配置创建extensionserver
    //因为extension server是chain上的最后一个server,需要传NewEmptyDelegate,表示不需要代理任何server,如果处理不了则返回404错误
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())

    //根据通用配置创建kubeapiserver
    //将extensionserver传给kubeapiserver,即kubeapiserver作为extensionserver的代理
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)

    //根据通用配置生成aggregatorserver需要的配置
	// aggregator comes last in the chain
	aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)

    //根据aggregatorserver配置创建aggregatorserver
    //将kubeapiserver传递给aggregatorserver,即aggregatorserver作为kubeapiserver的代理
	aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

    //最后只需要将aggregatorServer返回,因为aggregatorServer是server chain的第一个server,所有的
    //请求会先经过它,如果处理不了,再将请求转发给kubeapiserver,如果仍然处理不了,最后转发给extensionserver
	return aggregatorServer, nil
}

CreateKubeAPIServerConfig

根据命令行参数创建通用配置
在这里插入图片描述

// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
	*controlplane.Config,
	aggregatorapiserver.ServiceResolver,
	[]admission.PluginInitializer,
	error,
) {
	genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)

	config := &controlplane.Config{
		GenericConfig: genericConfig,
		ExtraConfig: controlplane.ExtraConfig{
			...
			ServiceIPRange:          s.PrimaryServiceClusterIPRange,
			APIServerServiceIP:      s.APIServerServiceIP,
			SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
			
			APIServerServicePort: 443,
			...
		}
	
	...
	
	return config, serviceResolver, pluginInitializers, nil
}

// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(
	s *options.ServerRunOptions,
	proxyTransport *http.Transport,
) (
	genericConfig *genericapiserver.Config,
	versionedInformers clientgoinformers.SharedInformerFactory,
	serviceResolver aggregatorapiserver.ServiceResolver,
	pluginInitializers []admission.PluginInitializer,
	admissionPostStartHook genericapiserver.PostStartHookFunc,
	storageFactory *serverstorage.DefaultStorageFactory,
	lastErr error,
) {
	genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)

	...
	
	//根据配置生成认证器
	// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
	if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
		return
	}
	
	//根据配置生成授权
	genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
	if err != nil {
		lastErr = fmt.Errorf("invalid authorization config: %v", err)
		return
	}
	
	//根据配置生成准入控制
	err = s.Admission.ApplyTo(
		genericConfig,
		versionedInformers,
		kubeClientConfig,
		utilfeature.DefaultFeatureGate,
		pluginInitializers...)

	return
}

1. createAPIExtensionsServer

apiextensionsserver用于处理对CRD资源的CURD请求。

//cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
	return apiextensionsConfig.Complete().New(delegateAPIServer)
}

//k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
	//创建通用server,传递了server名字和代理对象,后面会单独解释这个函数
	genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)

	//crd结构体
	s := &CustomResourceDefinitions{
		GenericAPIServer: genericServer,
	}

	apiResourceConfig := c.GenericConfig.MergedResourceConfig
	//const GroupName = "apiextensions.k8s.io"
	//创建apiGroupInfo
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
		return APIGroupInfo{
			//k8s.io/apiserver/pkg/server/genericapiserver.go
			//从资源注册表中获取group对应的资源的所有版本(优先级高的在前面)
			PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),
			VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
			// TODO unhardcode this.  It was hardcoded before, but we need to re-evaluate
			OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
			Scheme:                 scheme,
			ParameterCodec:         parameterCodec,
			NegotiatedSerializer:   codecs,
		}

	//var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
	if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
		//storage表示底层存储接口,每种资源对应一种storage。
		//收到创建crd请求后,会调用storage接口将数据保存到etcd
		storage := map[string]rest.Storage{}
		// customresourcedefinitions
		customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)

		storage["customresourcedefinitions"] = customResourceDefinitionStorage
		storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

		apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
	}

	//安装如下两个资源,暴露的restful路径如下
	// /apis/apiextensions.k8s.io/v1/customresourcedefinitions
	// /apis/apiextensions.k8s.io/v1/customresourcedefinitions/status
	//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数
	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
		return nil, err
	}

	crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
	s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)

	//获取代理对象,如果为空,则设置默认notfound
	delegateHandler := delegationTarget.UnprotectedHandler()
	if delegateHandler == nil {
		delegateHandler = http.NotFoundHandler()
	}

	versionDiscoveryHandler := &versionDiscoveryHandler{
		discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
		delegate:  delegateHandler,
	}
	groupDiscoveryHandler := &groupDiscoveryHandler{
		discovery: map[string]*discovery.APIGroupHandler{},
		delegate:  delegateHandler,
	}
	establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
	crdHandler, err := NewCustomResourceDefinitionHandler(
		versionDiscoveryHandler,
		groupDiscoveryHandler,
		s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
		delegateHandler,
		...
	)

	//注册非gorestful路径
	//精确匹配/apis,即请求路径为/apis,则调用crdHandler
	s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
	//前缀匹配/apis/,即请求路径包含/apis/,则调用crdHandler
	s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
	...
}

NewREST
每种k8s资源都需要对外提供restful风格的资源存储服务API,而且必须实现下面的接口

type Storage interface {
	// New returns an empty object that can be used with Create and Update after request data has been put into it.
	// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
	New() runtime.Object
}

REST实现了上面的New接口

//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
type REST struct {
	*genericregistry.Store
}

//k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {
	// NewFunc returns a new instance of the type this registry returns for a
	// GET of a single object, e.g.:
	//
	// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
	NewFunc func() runtime.Object
	...
}

//Store 实现了New()接口
// New implements RESTStorage.New.
func (e *Store) New() runtime.Object {
	return e.NewFunc()
}

NewREST初始化结构体genericregistry.Store

//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
	...
	//结构体genericregistry.Store在k8s.io/apiserver/pkg/registry/generic/registry/store.go
	store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },
		NewListFunc:              func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },
		PredicateFunc:            MatchCustomResourceDefinition,
		DefaultQualifiedResource: apiextensions.Resource("customresourcedefinitions"),

		CreateStrategy:      strategy,
		UpdateStrategy:      strategy,
		DeleteStrategy:      strategy,
		ResetFieldsStrategy: strategy,

		// TODO: define table converter that exposes more than name/creation timestamp
		TableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),
	}
	
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, err
	}
	return &REST{store}, nil
}

CompleteWithOptions实现了接口storage.Interface,用于和etcd集群交互,存储请求数据。

//k8s.io/apiserver/pkg/registry/generic/registry/store.go
// CompleteWithOptions updates the store with the provided options and
// defaults common fields.
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
	//options.RESTOptions.GetRESTOptions 在k8s.io/apiserver/pkg/server/options/etcd.go
	opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
			storageConfig, err := f.StorageFactory.NewConfig(resource)
			ret := generic.RESTOptions{
				StorageConfig:           storageConfig,
				Decorator:               generic.UndecoratedStorage,
				DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
				EnableGarbageCollection: f.Options.EnableGarbageCollection,
				ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
				CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
			}
	if e.Storage.Storage == nil {
		e.Storage.Codec = opts.StorageConfig.Codec
		var err error
		//opts.Decorator 为 generic.UndecoratedStorage
		e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
			opts.StorageConfig,
			prefix,
			keyFunc,
			e.NewFunc,
			e.NewListFunc,
			attrFunc,
			options.TriggerFunc,
			options.Indexers,
		)
	}
}

UndecoratedStorage调用栈,最后初始化了用于和etcd交互的client

//k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
UndecoratedStorage
	return NewRawStorage(config, newFunc)
		//k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
		return factory.Create(*config, newFunc)
			//k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
			return newETCD3Storage(c, newFunc)
				//k8s.io/apiserver/pkg/storage/etcd3/store.go
				return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
					return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)

func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
	versioner := APIObjectVersioner{}
	result := &store{
		client:        c,
		codec:         codec,
		versioner:     versioner,
		transformer:   transformer,
		pagingEnabled: pagingEnabled,
		// for compatibility with etcd2 impl.
		// no-op for default prefix of '/registry'.
		// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
		pathPrefix:   path.Join("/", prefix),
		watcher:      newWatcher(c, codec, newFunc, versioner, transformer),
		leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
	}
	return result
}

// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	txnResp, err := s.client.KV.Txn(ctx).If(
		notFound(key),
	).Then(
		clientv3.OpPut(key, string(newData), opts...),
	).Commit()
}

2. kubeAPIServer

kubeapiserver用于处理对核心无分组和分组资源的CURD处理。这里重点看一下这些资源的注册过程。

//cmd/kube-apiserver/app/server.go
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)

	return kubeAPIServer, nil
}

//pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
	...
	m := &Instance{
		GenericAPIServer:          s,
		ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
	}

	// install legacy rest storage
	if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
			StorageFactory:              c.ExtraConfig.StorageFactory,
			ProxyTransport:              c.ExtraConfig.ProxyTransport,
			KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
			EventTTL:                    c.ExtraConfig.EventTTL,
			ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
			SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
			ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
			ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
			APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
		}
		//2.1 安装核心无分组资源,暴露的resfful api格式如下:/api/v1/resource,比如pod,service资源
		if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
			return nil, err
		}
	}

	restStorageProviders := []RESTStorageProvider{
		apiserverinternalrest.StorageProvider{},
		authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
		authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
		autoscalingrest.RESTStorageProvider{},
		batchrest.RESTStorageProvider{},
		certificatesrest.RESTStorageProvider{},
		coordinationrest.RESTStorageProvider{},
		discoveryrest.StorageProvider{},
		networkingrest.RESTStorageProvider{},
		noderest.RESTStorageProvider{},
		policyrest.RESTStorageProvider{},
		rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
		schedulingrest.RESTStorageProvider{},
		storagerest.RESTStorageProvider{},
		flowcontrolrest.RESTStorageProvider{},
		// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
		// See https://github.com/kubernetes/kubernetes/issues/42392
		appsrest.StorageProvider{},
		admissionregistrationrest.RESTStorageProvider{},
		eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
	}
	//2.2 安装分组资源,暴露的resfful api格式如下:/apis/group/version/resource,比如deployment资源
	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
		return nil, err
	}
}

2.1 InstallLegacyAPI
安装核心无分组资源

// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
	//2.1.1
	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
	...
	//2.1.2
	//DefaultLegacyAPIPrefix = "/api"
	if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
		return fmt.Errorf("error in registering group versions: %v", err)
	}
	return nil
}

2.1.1 NewLegacyRESTStorage
创建APIGroupInfo,并创建每种资源需要的storage,存放于VersionedResourcesStorageMap中。

//pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
	apiGroupInfo := genericapiserver.APIGroupInfo{
		//从全局资源注册表legacyscheme中获取核心无分组资源的优先版本(对于核心无分组资源来说只有一个V1版本),“”“”表示无分组
		PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
		//用于存放不同版本资源对应的storage
		VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
		Scheme:                       legacyscheme.Scheme,
		ParameterCodec:               legacyscheme.ParameterCodec,
		NegotiatedSerializer:         legacyscheme.Codecs,
	}

	podStorage, err := podstore.NewStorage(
		restOptionsGetter,
		nodeStorage.KubeletConnectionInfo,
		c.ProxyTransport,
		podDisruptionClient,
	)

	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		...
	}
	//核心无分组资源只有v1版本
	apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
	return restStorage, apiGroupInfo, nil
}

2.1.2 InstallLegacyAPIGroup

//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	...
	//将资源安装到/api/路径下,后面会再次分析此函数
	if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
		return err
	}
	...
	return nil
}

2.2 InstallAPIs
安装分组资源

//pkg/controlplane/instance.go
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
	//因为这里安装的是分组资源,所以需要多个apiGroupsInfo
	apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
	...
	//restStorageProviders是数组类型,每个元素表示一种分组资源
	for _, restStorageBuilder := range restStorageProviders {
		groupName := restStorageBuilder.GroupName()
		if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {
			klog.V(1).Infof("Skipping disabled API group %q.", groupName)
			continue
		}
		//2.2.1 针对每种分组,生成apiGroupInfo
		apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
		if err != nil {
			return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
		}
		if !enabled {
			klog.Warningf("API group %q is not enabled, skipping.", groupName)
			continue
		}

		klog.V(1).Infof("Enabling API group %q.", groupName)

		apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
	}
	//2.2.2安装使能的所有分组资源
	m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...)
	
	return nil
}

2.2.1 NewRESTStorage
NewRESTStorage是一个接口,每种分组资源都有其实现。

// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
	GroupName() string
	NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}

这里以app分组为例说明

//pkg/registry/apps/rest/storage_app.go
// StorageProvider is a struct for apps REST storage.
type StorageProvider struct{}

// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
	//const GroupName = "apps"
	//创建apiGroupInfo
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
	// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.
	// TODO refactor the plumbing to provide the information in the APIGroupInfo

	if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
		storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)
		if err != nil {
			return genericapiserver.APIGroupInfo{}, false, err
		}
		apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
	}

	return apiGroupInfo, true, nil
}

v1Storage
app分组包含deployments,daemonsets等资源,这里创建每种资源对应的storage

//pkg/registry/apps/rest/storage_app.go
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
	storage := map[string]rest.Storage{}

	// deployments
	deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
	if err != nil {
		return storage, err
	}
	storage["deployments"] = deploymentStorage.Deployment
	storage["deployments/status"] = deploymentStorage.Status
	storage["deployments/scale"] = deploymentStorage.Scale

	// statefulsets
	statefulSetStorage, err := statefulsetstore.NewStorage(restOptionsGetter)
	if err != nil {
		return storage, err
	}
	storage["statefulsets"] = statefulSetStorage.StatefulSet
	storage["statefulsets/status"] = statefulSetStorage.Status
	storage["statefulsets/scale"] = statefulSetStorage.Scale

	// daemonsets
	daemonSetStorage, daemonSetStatusStorage, err := daemonsetstore.NewREST(restOptionsGetter)
	if err != nil {
		return storage, err
	}
	storage["daemonsets"] = daemonSetStorage
	storage["daemonsets/status"] = daemonSetStatusStorage

	// replicasets
	replicaSetStorage, err := replicasetstore.NewStorage(restOptionsGetter)
	if err != nil {
		return storage, err
	}
	storage["replicasets"] = replicaSetStorage.ReplicaSet
	storage["replicasets/status"] = replicaSetStorage.Status
	storage["replicasets/scale"] = replicaSetStorage.Scale

	// controllerrevisions
	historyStorage, err := controllerrevisionsstore.NewREST(restOptionsGetter)
	if err != nil {
		return storage, err
	}
	storage["controllerrevisions"] = historyStorage

	return storage, nil
}

2.2.2 InstallAPIGroups

//k8s.io/apiserver/pkg/server/genericapiserver.go
// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
	...
	for _, apiGroupInfo := range apiGroupInfos {
		//安装路径到/apis
		//APIGroupPrefix = "/apis"
		s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
		...
	}
	return nil
}

3. createAggregatorServer

aggregatorserver是serverchain上最后添加的server,所有的请求都会先经过aggregatorserver处理。

//cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
	aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
	...
}

NewWithDelegate

//k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
	//创建通用server
	genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
	
	//创建apiGroupInfo,其中又会创建资源需要的storage
	apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))
	
	//安装路由,暴露的restful路径如下
	// /apis/apiregistration.k8s.io/v1/apiservices
	// /apis/apiregistration.k8s.io/v1/apiservices/status
	//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数
	s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
}

NewRESTStorage

//k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go
// NewRESTStorage returns an APIGroupInfo object that will work against apiservice.
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, shouldServeBeta bool) genericapiserver.APIGroupInfo {
	//const GroupName = "apiregistration.k8s.io"
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)

	if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {
		storage := map[string]rest.Storage{}
		apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
		storage["apiservices"] = apiServiceREST
		storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
		apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
	}

	return apiGroupInfo
}

NewREST

//k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST {
	strategy := apiservice.NewStrategy(scheme)
	store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &apiregistration.APIService{} },
		NewListFunc:              func() runtime.Object { return &apiregistration.APIServiceList{} },
		PredicateFunc:            apiservice.MatchAPIService,
		DefaultQualifiedResource: apiregistration.Resource("apiservices"),

		CreateStrategy:      strategy,
		UpdateStrategy:      strategy,
		DeleteStrategy:      strategy,
		ResetFieldsStrategy: strategy,

		// TODO: define table converter that exposes more than name/creation timestamp
		TableConvertor: rest.NewDefaultTableConvertor(apiregistration.Resource("apiservices")),
	}
	options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: apiservice.GetAttrs}
	if err := store.CompleteWithOptions(options); err != nil {
		panic(err) // TODO: Propagate error up
	}
	return &REST{store}
}

4. installAPIResources**

安装无分组资源和分组资源最后都会调用到installAPIResources,这里重点看一下此函数

//k8s.io/apiserver/pkg/server/genericapiserver.go
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
	var resourceInfos []*storageversion.ResourceInfo
	//遍历此分组的所有版本
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
		...
		//安装route到GoRestfulContainer
		r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
		resourceInfos = append(resourceInfos, r...)
	}
	...
	return nil
}

// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}

	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	...
	//将ws添加到container
	container.Add(ws)
	return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}

// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var resourceInfos []*storageversion.ResourceInfo
	var errors []error
	ws := a.newWebService()

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		...
	}
	return apiResources, resourceInfos, ws, errors
}

registerResourceHandlers
将path对应的handler(storage提供)构造route,并将route添加到ws。这是一个很长的函数,只截取了部分代码展示大概流程

//k8s.io/apiserver/pkg/endpoints/install.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
	admit := a.group.Admit
	...
	// what verbs are supported by the storage, used to know what verbs we support per path
	//根据golang的断言语法判断storage是否支持Creater等接口
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	...
	// Get the list of actions for the given scope.
	switch {
	case !namespaceScoped:
		// Handle non-namespace scoped resources like nodes.
		resourcePath := resource
		resourceParams := params
		itemPath := resourcePath + "/{name}"
		nameParams := append(params, nameParam)
		proxyParams := append(nameParams, pathParam)
		suffix := ""
		...
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		...
	}
	
	for _, action := range actions {
		switch action.Verb {
		...
		case "POST": // Create a resource.
			var handler restful.RouteFunction
			if isNamedCreater {
				//创建handler,收到对应的请求时调用handler
				handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
			} else {
				handler = restfulCreateResource(creater, reqScope, admit)
			}

			handler = utilwarning.AddWarningsHandler(handler, warnings)
			article := GetArticleForNoun(kind, " ")
			doc := "create" + article + kind
			if isSubresource {
				doc = "create " + subresource + " of" + article + kind
			}
			//将route和handler关联起来
			route := ws.POST(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				// TODO: in some cases, the API may return a v1.Status instead of the versioned object
				// but currently go-restful can't handle multiple different objects being returned.
				Returns(http.StatusCreated, "Created", producedObject).
				Returns(http.StatusAccepted, "Accepted", producedObject).
				Reads(defaultVersionedObject).
				Writes(producedObject)
			routes = append(routes, route)
			...
		}
		
		for _, route := range routes {
			//将route添加到webservice中
			ws.Route(route)
		}
	}
本文含有隐藏内容,请 开通VIP 后查看