本文主要看一下apiserver的启动及三种server安装路由的流程。
kube-apiserver进程启动入口
//cmd/kube-apiserver/apiserver.go
import (
"k8s.io/kubernetes/cmd/kube-apiserver/app"
...
)
func main() {
rand.Seed(time.Now().UnixNano())
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
command := app.NewAPIServerCommand()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
使用cobra框架,最终调用Run
//cmd/kube-apiserver/app/server.go
//通过包导入的方式注册apiserver资源
import (
//创建extensions全局注册表,并注册extensions资源到extensions全局注册表
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
//创建aggregator全局注册表,并注册aggregator资源到aggregator全局注册表
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
//创建kubeapiserver全局注册表
"k8s.io/kubernetes/pkg/api/legacyscheme"
//注册kubeapiserver资源到kubeapiserver全局注册表
"k8s.io/kubernetes/pkg/controlplane"
...
)
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
cmd := &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
completedOptions, err := Complete(s)
return Run(completedOptions, genericapiserver.SetupSignalHandler())
}
}
}
Run主流程,短短几行,但其内部的实现流程还是比较长的,接下来慢慢看
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
//创建server链。kube-apiserver进程会启动三种server,以链的形式对外提供http服务,后面会详细讲解
server, err := CreateServerChain(completeOptions, stopCh)
//启动server前的准备工作
prepared, err := server.PrepareRun()
//最后启动http server,开始监听接收客户端请求
return prepared.Run(stopCh)
}
CreateServerChain
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
//根据命令行选项创建通用配置
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
//根据通用配置生成extensionserver需要的配置
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
//根据extensionserver配置创建extensionserver
//因为extension server是chain上的最后一个server,需要传NewEmptyDelegate,表示不需要代理任何server,如果处理不了则返回404错误
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
//根据通用配置创建kubeapiserver
//将extensionserver传给kubeapiserver,即kubeapiserver作为extensionserver的代理
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
//根据通用配置生成aggregatorserver需要的配置
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
//根据aggregatorserver配置创建aggregatorserver
//将kubeapiserver传递给aggregatorserver,即aggregatorserver作为kubeapiserver的代理
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
//最后只需要将aggregatorServer返回,因为aggregatorServer是server chain的第一个server,所有的
//请求会先经过它,如果处理不了,再将请求转发给kubeapiserver,如果仍然处理不了,最后转发给extensionserver
return aggregatorServer, nil
}
CreateKubeAPIServerConfig
根据命令行参数创建通用配置
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
...
ServiceIPRange: s.PrimaryServiceClusterIPRange,
APIServerServiceIP: s.APIServerServiceIP,
SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,
APIServerServicePort: 443,
...
}
...
return config, serviceResolver, pluginInitializers, nil
}
// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
admissionPostStartHook genericapiserver.PostStartHookFunc,
storageFactory *serverstorage.DefaultStorageFactory,
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
...
//根据配置生成认证器
// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
//根据配置生成授权
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
//根据配置生成准入控制
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
return
}
1. createAPIExtensionsServer
apiextensionsserver用于处理对CRD资源的CURD请求。
//cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
//k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
//创建通用server,传递了server名字和代理对象,后面会单独解释这个函数
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
//crd结构体
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
apiResourceConfig := c.GenericConfig.MergedResourceConfig
//const GroupName = "apiextensions.k8s.io"
//创建apiGroupInfo
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
return APIGroupInfo{
//k8s.io/apiserver/pkg/server/genericapiserver.go
//从资源注册表中获取group对应的资源的所有版本(优先级高的在前面)
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
// TODO unhardcode this. It was hardcoded before, but we need to re-evaluate
OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
Scheme: scheme,
ParameterCodec: parameterCodec,
NegotiatedSerializer: codecs,
}
//var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
//storage表示底层存储接口,每种资源对应一种storage。
//收到创建crd请求后,会调用storage接口将数据保存到etcd
storage := map[string]rest.Storage{}
// customresourcedefinitions
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
storage["customresourcedefinitions"] = customResourceDefinitionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}
//安装如下两个资源,暴露的restful路径如下
// /apis/apiextensions.k8s.io/v1/customresourcedefinitions
// /apis/apiextensions.k8s.io/v1/customresourcedefinitions/status
//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
//获取代理对象,如果为空,则设置默认notfound
delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}
versionDiscoveryHandler := &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
delegate: delegateHandler,
}
groupDiscoveryHandler := &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
...
)
//注册非gorestful路径
//精确匹配/apis,即请求路径为/apis,则调用crdHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
//前缀匹配/apis/,即请求路径包含/apis/,则调用crdHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
...
}
NewREST
每种k8s资源都需要对外提供restful风格的资源存储服务API,而且必须实现下面的接口
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
}
REST实现了上面的New接口
//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
type REST struct {
*genericregistry.Store
}
//k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {
// NewFunc returns a new instance of the type this registry returns for a
// GET of a single object, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
NewFunc func() runtime.Object
...
}
//Store 实现了New()接口
// New implements RESTStorage.New.
func (e *Store) New() runtime.Object {
return e.NewFunc()
}
NewREST初始化结构体genericregistry.Store
//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
...
//结构体genericregistry.Store在k8s.io/apiserver/pkg/registry/generic/registry/store.go
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },
NewListFunc: func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },
PredicateFunc: MatchCustomResourceDefinition,
DefaultQualifiedResource: apiextensions.Resource("customresourcedefinitions"),
CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
ResetFieldsStrategy: strategy,
// TODO: define table converter that exposes more than name/creation timestamp
TableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return &REST{store}, nil
}
CompleteWithOptions实现了接口storage.Interface,用于和etcd集群交互,存储请求数据。
//k8s.io/apiserver/pkg/registry/generic/registry/store.go
// CompleteWithOptions updates the store with the provided options and
// defaults common fields.
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
//options.RESTOptions.GetRESTOptions 在k8s.io/apiserver/pkg/server/options/etcd.go
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
storageConfig, err := f.StorageFactory.NewConfig(resource)
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
//opts.Decorator 为 generic.UndecoratedStorage
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
}
}
UndecoratedStorage调用栈,最后初始化了用于和etcd交互的client
//k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
UndecoratedStorage
return NewRawStorage(config, newFunc)
//k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
return factory.Create(*config, newFunc)
//k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go
return newETCD3Storage(c, newFunc)
//k8s.io/apiserver/pkg/storage/etcd3/store.go
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
}
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
}
2. kubeAPIServer
kubeapiserver用于处理对核心无分组和分组资源的CURD处理。这里重点看一下这些资源的注册过程。
//cmd/kube-apiserver/app/server.go
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
return kubeAPIServer, nil
}
//pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
...
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
// install legacy rest storage
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
//2.1 安装核心无分组资源,暴露的resfful api格式如下:/api/v1/resource,比如pod,service资源
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
//2.2 安装分组资源,暴露的resfful api格式如下:/apis/group/version/resource,比如deployment资源
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
}
2.1 InstallLegacyAPI
安装核心无分组资源
// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
//2.1.1
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
...
//2.1.2
//DefaultLegacyAPIPrefix = "/api"
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
2.1.1 NewLegacyRESTStorage
创建APIGroupInfo,并创建每种资源需要的storage,存放于VersionedResourcesStorageMap中。
//pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
//从全局资源注册表legacyscheme中获取核心无分组资源的优先版本(对于核心无分组资源来说只有一个V1版本),“”“”表示无分组
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
//用于存放不同版本资源对应的storage
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: legacyscheme.Scheme,
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
...
}
//核心无分组资源只有v1版本
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
return restStorage, apiGroupInfo, nil
}
2.1.2 InstallLegacyAPIGroup
//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
...
//将资源安装到/api/路径下,后面会再次分析此函数
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}
...
return nil
}
2.2 InstallAPIs
安装分组资源
//pkg/controlplane/instance.go
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
//因为这里安装的是分组资源,所以需要多个apiGroupsInfo
apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
...
//restStorageProviders是数组类型,每个元素表示一种分组资源
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {
klog.V(1).Infof("Skipping disabled API group %q.", groupName)
continue
}
//2.2.1 针对每种分组,生成apiGroupInfo
apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
}
if !enabled {
klog.Warningf("API group %q is not enabled, skipping.", groupName)
continue
}
klog.V(1).Infof("Enabling API group %q.", groupName)
apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
}
//2.2.2安装使能的所有分组资源
m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...)
return nil
}
2.2.1 NewRESTStorage
NewRESTStorage是一个接口,每种分组资源都有其实现。
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}
这里以app分组为例说明
//pkg/registry/apps/rest/storage_app.go
// StorageProvider is a struct for apps REST storage.
type StorageProvider struct{}
// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
//const GroupName = "apps"
//创建apiGroupInfo
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.
// TODO refactor the plumbing to provide the information in the APIGroupInfo
if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, false, err
}
apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
}
return apiGroupInfo, true, nil
}
v1Storage
app分组包含deployments,daemonsets等资源,这里创建每种资源对应的storage
//pkg/registry/apps/rest/storage_app.go
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
storage := map[string]rest.Storage{}
// deployments
deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
if err != nil {
return storage, err
}
storage["deployments"] = deploymentStorage.Deployment
storage["deployments/status"] = deploymentStorage.Status
storage["deployments/scale"] = deploymentStorage.Scale
// statefulsets
statefulSetStorage, err := statefulsetstore.NewStorage(restOptionsGetter)
if err != nil {
return storage, err
}
storage["statefulsets"] = statefulSetStorage.StatefulSet
storage["statefulsets/status"] = statefulSetStorage.Status
storage["statefulsets/scale"] = statefulSetStorage.Scale
// daemonsets
daemonSetStorage, daemonSetStatusStorage, err := daemonsetstore.NewREST(restOptionsGetter)
if err != nil {
return storage, err
}
storage["daemonsets"] = daemonSetStorage
storage["daemonsets/status"] = daemonSetStatusStorage
// replicasets
replicaSetStorage, err := replicasetstore.NewStorage(restOptionsGetter)
if err != nil {
return storage, err
}
storage["replicasets"] = replicaSetStorage.ReplicaSet
storage["replicasets/status"] = replicaSetStorage.Status
storage["replicasets/scale"] = replicaSetStorage.Scale
// controllerrevisions
historyStorage, err := controllerrevisionsstore.NewREST(restOptionsGetter)
if err != nil {
return storage, err
}
storage["controllerrevisions"] = historyStorage
return storage, nil
}
2.2.2 InstallAPIGroups
//k8s.io/apiserver/pkg/server/genericapiserver.go
// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
...
for _, apiGroupInfo := range apiGroupInfos {
//安装路径到/apis
//APIGroupPrefix = "/apis"
s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)
...
}
return nil
}
3. createAggregatorServer
aggregatorserver是serverchain上最后添加的server,所有的请求都会先经过aggregatorserver处理。
//cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
...
}
NewWithDelegate
//k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
//创建通用server
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
//创建apiGroupInfo,其中又会创建资源需要的storage
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))
//安装路由,暴露的restful路径如下
// /apis/apiregistration.k8s.io/v1/apiservices
// /apis/apiregistration.k8s.io/v1/apiservices/status
//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数
s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
}
NewRESTStorage
//k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go
// NewRESTStorage returns an APIGroupInfo object that will work against apiservice.
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, shouldServeBeta bool) genericapiserver.APIGroupInfo {
//const GroupName = "apiregistration.k8s.io"
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)
if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
storage["apiservices"] = apiServiceREST
storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return apiGroupInfo
}
NewREST
//k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST {
strategy := apiservice.NewStrategy(scheme)
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apiregistration.APIService{} },
NewListFunc: func() runtime.Object { return &apiregistration.APIServiceList{} },
PredicateFunc: apiservice.MatchAPIService,
DefaultQualifiedResource: apiregistration.Resource("apiservices"),
CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
ResetFieldsStrategy: strategy,
// TODO: define table converter that exposes more than name/creation timestamp
TableConvertor: rest.NewDefaultTableConvertor(apiregistration.Resource("apiservices")),
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: apiservice.GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
return &REST{store}
}
4. installAPIResources**
安装无分组资源和分组资源最后都会调用到installAPIResources,这里重点看一下此函数
//k8s.io/apiserver/pkg/server/genericapiserver.go
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
var resourceInfos []*storageversion.ResourceInfo
//遍历此分组的所有版本
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
...
//安装route到GoRestfulContainer
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
resourceInfos = append(resourceInfos, r...)
}
...
return nil
}
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
...
//将ws添加到container
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
ws := a.newWebService()
// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}
return apiResources, resourceInfos, ws, errors
}
registerResourceHandlers
将path对应的handler(storage提供)构造route,并将route添加到ws。这是一个很长的函数,只截取了部分代码展示大概流程
//k8s.io/apiserver/pkg/endpoints/install.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
admit := a.group.Admit
...
// what verbs are supported by the storage, used to know what verbs we support per path
//根据golang的断言语法判断storage是否支持Creater等接口
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
...
// Get the list of actions for the given scope.
switch {
case !namespaceScoped:
// Handle non-namespace scoped resources like nodes.
resourcePath := resource
resourceParams := params
itemPath := resourcePath + "/{name}"
nameParams := append(params, nameParam)
proxyParams := append(nameParams, pathParam)
suffix := ""
...
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
...
}
for _, action := range actions {
switch action.Verb {
...
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
//创建handler,收到对应的请求时调用handler
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = utilwarning.AddWarningsHandler(handler, warnings)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
//将route和handler关联起来
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
routes = append(routes, route)
...
}
for _, route := range routes {
//将route添加到webservice中
ws.Route(route)
}
}