阅读kube-apiserver的过程中,会发现很多的AddPostStartHook的代码,这部分代码用于执行kube-apiserver启动之后的逻辑,因为他们放在启动后执行更适合,所以就提供了两种钩子(Hook), PostStartHook和PreShutdownHook。这里只看PostStartHook,并且只看bootstrap-controller对应的钩子函数。
当集群初次创建的时候,查看k8s的service的时候会发现,有一个叫做kubernetes的service,比如下面这样。
kubectl get service
# 输出如下
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 3s
如果不小心删除了,会发现它又自动创建了,那么它是怎么实现的呢?
这里简单的列一下之前的调用链
func CreateServerChain(*aggregatorapiserver.APIAggregator, error) {
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
}
func CreateKubeAPIServer() {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New() {
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
func (m *Instance) InstallLegacyAPI() error {
// 1.
controllerName := "bootstrap-controller"
// 2.
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
// 3.
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
// 4.
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
// 5.
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
return nil
}
前面的调用链就不细说了。
代码分解如下:
controller名字controllerPostStartHook然后我们继续这个controller的创建和启动逻辑
func (c *completedConfig) NewBootstrapController() *Controller {
// 1.
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
// 2. kube-system, kube-public, kube-node-lease
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
return &Controller{
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
SystemNamespaces: systemNamespaces,
SystemNamespacesInterval: 1 * time.Minute,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
}
}
// 3.
func (c *Controller) PostStartHook() error {
c.Start()
return nil
}
// 4.
func (c *Controller) Start() {
// 5.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
// 6.
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
// 忽略验证/修复服务ip的逻辑
// 7.
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
}
// 8.
func (r *Runner) Start() {
if r.stop == nil {
c := make(chan struct{})
r.stop = &c
for i := range r.loopFuncs {
go r.loopFuncs[i](*r.stop)
}
}
}
代码分解如下:
--service-cluster-ip-range=10.96.0.0/12参数的第一个IP是10.96.0.1Service对应的Endpoints,用于删除对应的Endpointkube-apiserver可能还没有启动完成,所以先删除对应的Endpoint(如果有的话)Runner对象,传入一串后续要执行的函数这个实现比较简单,就顺便一起看看
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() {
// Loop the system namespace list, and create them if they do not exist
for _, ns := range c.SystemNamespaces {
if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
}
}
}, c.SystemNamespacesInterval, ch)
}
代码比较简单,就是不断的检查是否有必要创建,默认间隔是每分钟。
func (c *Controller) RunKubernetesService(ch chan struct{}) {
// 1.
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.readyzClient.Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
// 2.
wait.NonSlidingUntil(func() {
// 3.
if err := c.UpdateKubernetesService(false); err != nil {
runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
}
}, c.EndpointInterval, ch)
}
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// 4.
if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
return err
}
return nil
}
代码分解如下:
kubernetes service, 如果需要的话Kube-apiserver处了处理客户端请求,还会起一堆的controller用于监控必要的资源并镜像相关的操作,这些操作通过注册钩子函数的方式来实现。
阅读kube-apiserver的过程中,会发现很多的AddPostStartHook的代码,这部分代码用于执行kube-apiserver启动之后的逻辑,因为他们放在启动后执行更适合,所以就提供了两种钩子(Hook), PostStartHook和PreShutdownHook。这里只看PostStartHook,并且只看bootstrap-controller对应的钩子函数。
当集群初次创建的时候,查看k8s的service的时候会发现,有一个叫做kubernetes的service,比如下面这样。
kubectl get service
# 输出如下
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 3s
如果不小心删除了,会发现它又自动创建了,那么它是怎么实现的呢?
这里简单的列一下之前的调用链
func CreateServerChain(*aggregatorapiserver.APIAggregator, error) {
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
}
func CreateKubeAPIServer() {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New() {
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
func (m *Instance) InstallLegacyAPI() error {
// 1.
controllerName := "bootstrap-controller"
// 2.
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
// 3.
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
// 4.
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
// 5.
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
return nil
}
前面的调用链就不细说了。
代码分解如下:
controller名字controllerPostStartHook然后我们继续这个controller的创建和启动逻辑
func (c *completedConfig) NewBootstrapController() *Controller {
// 1.
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
// 2. kube-system, kube-public, kube-node-lease
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
return &Controller{
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
SystemNamespaces: systemNamespaces,
SystemNamespacesInterval: 1 * time.Minute,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
}
}
// 3.
func (c *Controller) PostStartHook() error {
c.Start()
return nil
}
// 4.
func (c *Controller) Start() {
// 5.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
// 6.
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err)
}
// 忽略验证/修复服务ip的逻辑
// 7.
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
}
// 8.
func (r *Runner) Start() {
if r.stop == nil {
c := make(chan struct{})
r.stop = &c
for i := range r.loopFuncs {
go r.loopFuncs[i](*r.stop)
}
}
}
代码分解如下:
--service-cluster-ip-range=10.96.0.0/12参数的第一个IP是10.96.0.1Service对应的Endpoints,用于删除对应的Endpointkube-apiserver可能还没有启动完成,所以先删除对应的Endpoint(如果有的话)Runner对象,传入一串后续要执行的函数这个实现比较简单,就顺便一起看看
func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() {
// Loop the system namespace list, and create them if they do not exist
for _, ns := range c.SystemNamespaces {
if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
}
}
}, c.SystemNamespacesInterval, ch)
}
代码比较简单,就是不断的检查是否有必要创建,默认间隔是每分钟。
func (c *Controller) RunKubernetesService(ch chan struct{}) {
// 1.
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.readyzClient.Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
// 2.
wait.NonSlidingUntil(func() {
// 3.
if err := c.UpdateKubernetesService(false); err != nil {
runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
}
}, c.EndpointInterval, ch)
}
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// 4.
if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
return err
}
return nil
}
代码分解如下:
kubernetes service, 如果需要的话Kube-apiserver处了处理客户端请求,还会起一堆的controller用于监控必要的资源并镜像相关的操作,这些操作通过注册钩子函数的方式来实现。