kubernetes代码版本: v1.20.2
前一节大致看了一下apiserver 的启动流程,以及组成kube-apiserver的三个组件,这一节看看三个组件都会用到的一个非常重要的对象GenericAPIServer, 它是一个HTTP Server的抽象, 虽然这么说很抽象。它会提供注册路由的入口以及各种钩子函数的注册入口。
其实apiserver中的三个组件都会创建GenericAPIServer,看哪个其实都差不多,所以这些选择了apiExtensionsServer中的创建代码。
func CreateServerChain() (*aggregatorapiserver.APIAggregator, error) {
// 1.
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
}
func createAPIExtensionsServer(/*参数省略*/) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
// 2.
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
//3.
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
return s, nil
}
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// 4.
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
//5.
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
// 6.
s := &GenericAPIServer{
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
}
return s, nil
}
// 7.
func NewAPIServerHandler() *APIServerHandler {
// 8.
nonGoRestfulMux := mux.NewPathRecorderMux(name)
// 9.
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
// 10. goreset
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{})
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// 11
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
代码分解如下:
apiExtensionsServer对象GenericAPIServer对象BuildHandlerChainFunc。apiServerHandler, 这是一个Handler, 它实现了ServeHTTP方法apiServerHandler构建GenericAPIServernonGoRestfulMux路由对象,与gorestfulContainer相对应notFoundHandler是nil, 所以apiExtensionsServer,所以这里不会设置notFoundHandler, 而是使用NewPathRecorderMux里的http.NotFoundHandler()go-restful的Container对象, go-restful是一个面向RESTful的go web框架, k8s的资源几乎都注册在它上面director对象,它负责在nonGoRestfulMux和gorestfulContainer之间路由,最后返回APIServerHandler上一节说过三个组件的是通过委托默认来调用的,即aggregatorServer ->kubeAPIServer -> aggregatorServer -> apiExtensionsServer,那么apiExtensionsServer如果也无法响应怎么办呢?这里的代码给出了回答, 它如果无法响应就会将请求委托给genericapiserver.NewEmptyDelegate(), 而emptyDelegate的UnprotectedHandler方法返回nil,所以最终会使用http.NotFoundHandler()作为apiserver的最终兜底方案,即返回404.
这里提到了nonGoRestfulMux,gorestfulContainer两个对象,前者用于注册一些非RESTful风格的路由,比如/health, /metrics等接口,而/apis/apps/v1, /api/v1等都是RESTful风格的路由,所以两者使用不同的对象来注册路由。
前文简单的提及了一下PrepareRun, 这里再简单重复一下
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// 可以看到GenericServer占据了C位
prepared := s.GenericAPIServer.PrepareRun()
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.delegationTarget.PrepareRun()
return preparedGenericAPIServer{s}
}
代码并不复杂,最终是通过delegationTarget.PrepareRun()将ServerChain上的各个组件都执行到,最后用preparedGenericAPIServer包装一下。
从前文知道APIAggregator最终是运行Run方法,这里深入一下
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// 1.
delayedStopCh := make(chan struct{})
go func() {
defer close(delayedStopCh)
<-stopCh
close(s.readinessStopCh)
time.Sleep(s.ShutdownDelayDuration)
}()
// 2.
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
// 3.
<-stopCh
err = s.RunPreShutdownHooks()
// 4.
<-stoppedCh
s.HandlerChainWaitGroup.Wait()
return nil
}
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
// 5.
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
}
// 6.
s.RunPostStartHooks(stopCh)
return stoppedCh, nil
}
代码分解如下:
这里主要是运行各种钩子函数,而NonBlockingRun也比较好比较理解,以非阻塞的方式运行,所以后面的等待都是通过通道来阻塞主进程,后面就应该是具体的接受客户端请求的逻辑了。
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
// 1.
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
// 2.
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
}
// 设置http服务器的各种参数,比如读写数据流的帧(FrameSize)大小
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
go func() {
defer utilruntime.HandleCrash()
var listener net.Listener
listener = tcpKeepAliveListener{ln}
// 3.
err := server.Serve(listener)
select {
case <-stopCh:
klog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
return stoppedCh, nil
}
代码分解如下:
net/http构造http serverGenericAPIServer.Handler, 这个方法会负责处理请求GenericAPIServer的启动分为了两部分,一部分是自己业务相关的设置和钩子函数执行,一部分是http标准库的相关设置,然后我们就可以回过头来看看GenericAPIServer.Handler
在此之前让我们回顾一下GenericAPIServer和APIServerHandler的数据结构
type GenericAPIServer struct {
Handler *APIServerHandler
}
type APIServerHandler struct {
FullHandlerChain http.Handler
GoRestfulContainer *restful.Container
NonGoRestfulMux *mux.PathRecorderMux
Director http.Handler
}
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
// 1.
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 2.
a.FullHandlerChain.ServeHTTP(w, r)
}
代码分解如下:
director外面用handlerChainBuilder包一层, 也就是将通用的处理方法(比如认证, 授权等)构造成一条链,作为公共对象,因为三个组件都会用到。处理链对象(handlerChain)的各项操作以后单独再说,这里先看director对象是怎么处理请求的。
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// 1.
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
if path == "/apis" || path == "/apis/" {
d.goRestfulContainer.Dispatch(w, req)
return
}
case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// 2.
d.nonGoRestfulMux.ServeHTTP(w, req)
}
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 3.
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
exactHandler.ServeHTTP(w, r)
return
}
for _, prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
prefixHandler.handler.ServeHTTP(w, r)
return
}
}
// 4.
h.notFoundHandler.ServeHTTP(w, r)
}
代码分解如下:
nonGoRestfulMuxaggregatorServer的下一级是kubeAPIServer。至此我们大致了解了kube-apiserver中的三个组件的核心组件GenericAPIServer是怎么将请求链接起来并移交请求的,以及内部的路由数据流,但我们还不了解怎么处理一些通用操作,比如认证,鉴权,审计等,以及k8s的api资源是怎么注册路由的,这两个问题我们放在后面的文章来说明。
kubernetes代码版本: v1.20.2
前一节大致看了一下apiserver 的启动流程,以及组成kube-apiserver的三个组件,这一节看看三个组件都会用到的一个非常重要的对象GenericAPIServer, 它是一个HTTP Server的抽象, 虽然这么说很抽象。它会提供注册路由的入口以及各种钩子函数的注册入口。
其实apiserver中的三个组件都会创建GenericAPIServer,看哪个其实都差不多,所以这些选择了apiExtensionsServer中的创建代码。
func CreateServerChain() (*aggregatorapiserver.APIAggregator, error) {
// 1.
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
}
func createAPIExtensionsServer(/*参数省略*/) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
// 2.
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
//3.
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
return s, nil
}
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
// 4.
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
//5.
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
// 6.
s := &GenericAPIServer{
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
}
return s, nil
}
// 7.
func NewAPIServerHandler() *APIServerHandler {
// 8.
nonGoRestfulMux := mux.NewPathRecorderMux(name)
// 9.
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
// 10. goreset
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{})
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// 11
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
代码分解如下:
apiExtensionsServer对象GenericAPIServer对象BuildHandlerChainFunc。apiServerHandler, 这是一个Handler, 它实现了ServeHTTP方法apiServerHandler构建GenericAPIServernonGoRestfulMux路由对象,与gorestfulContainer相对应notFoundHandler是nil, 所以apiExtensionsServer,所以这里不会设置notFoundHandler, 而是使用NewPathRecorderMux里的http.NotFoundHandler()go-restful的Container对象, go-restful是一个面向RESTful的go web框架, k8s的资源几乎都注册在它上面director对象,它负责在nonGoRestfulMux和gorestfulContainer之间路由,最后返回APIServerHandler上一节说过三个组件的是通过委托默认来调用的,即aggregatorServer ->kubeAPIServer -> aggregatorServer -> apiExtensionsServer,那么apiExtensionsServer如果也无法响应怎么办呢?这里的代码给出了回答, 它如果无法响应就会将请求委托给genericapiserver.NewEmptyDelegate(), 而emptyDelegate的UnprotectedHandler方法返回nil,所以最终会使用http.NotFoundHandler()作为apiserver的最终兜底方案,即返回404.
这里提到了nonGoRestfulMux,gorestfulContainer两个对象,前者用于注册一些非RESTful风格的路由,比如/health, /metrics等接口,而/apis/apps/v1, /api/v1等都是RESTful风格的路由,所以两者使用不同的对象来注册路由。
前文简单的提及了一下PrepareRun, 这里再简单重复一下
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// 可以看到GenericServer占据了C位
prepared := s.GenericAPIServer.PrepareRun()
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.delegationTarget.PrepareRun()
return preparedGenericAPIServer{s}
}
代码并不复杂,最终是通过delegationTarget.PrepareRun()将ServerChain上的各个组件都执行到,最后用preparedGenericAPIServer包装一下。
从前文知道APIAggregator最终是运行Run方法,这里深入一下
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// 1.
delayedStopCh := make(chan struct{})
go func() {
defer close(delayedStopCh)
<-stopCh
close(s.readinessStopCh)
time.Sleep(s.ShutdownDelayDuration)
}()
// 2.
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
// 3.
<-stopCh
err = s.RunPreShutdownHooks()
// 4.
<-stoppedCh
s.HandlerChainWaitGroup.Wait()
return nil
}
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
// 5.
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
}
// 6.
s.RunPostStartHooks(stopCh)
return stoppedCh, nil
}
代码分解如下:
这里主要是运行各种钩子函数,而NonBlockingRun也比较好比较理解,以非阻塞的方式运行,所以后面的等待都是通过通道来阻塞主进程,后面就应该是具体的接受客户端请求的逻辑了。
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
// 1.
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
// 2.
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
}
// 设置http服务器的各种参数,比如读写数据流的帧(FrameSize)大小
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
go func() {
defer utilruntime.HandleCrash()
var listener net.Listener
listener = tcpKeepAliveListener{ln}
// 3.
err := server.Serve(listener)
select {
case <-stopCh:
klog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
return stoppedCh, nil
}
代码分解如下:
net/http构造http serverGenericAPIServer.Handler, 这个方法会负责处理请求GenericAPIServer的启动分为了两部分,一部分是自己业务相关的设置和钩子函数执行,一部分是http标准库的相关设置,然后我们就可以回过头来看看GenericAPIServer.Handler
在此之前让我们回顾一下GenericAPIServer和APIServerHandler的数据结构
type GenericAPIServer struct {
Handler *APIServerHandler
}
type APIServerHandler struct {
FullHandlerChain http.Handler
GoRestfulContainer *restful.Container
NonGoRestfulMux *mux.PathRecorderMux
Director http.Handler
}
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
// 1.
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 2.
a.FullHandlerChain.ServeHTTP(w, r)
}
代码分解如下:
director外面用handlerChainBuilder包一层, 也就是将通用的处理方法(比如认证, 授权等)构造成一条链,作为公共对象,因为三个组件都会用到。处理链对象(handlerChain)的各项操作以后单独再说,这里先看director对象是怎么处理请求的。
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// 1.
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
if path == "/apis" || path == "/apis/" {
d.goRestfulContainer.Dispatch(w, req)
return
}
case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// 2.
d.nonGoRestfulMux.ServeHTTP(w, req)
}
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 3.
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
exactHandler.ServeHTTP(w, r)
return
}
for _, prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
prefixHandler.handler.ServeHTTP(w, r)
return
}
}
// 4.
h.notFoundHandler.ServeHTTP(w, r)
}
代码分解如下:
nonGoRestfulMuxaggregatorServer的下一级是kubeAPIServer。至此我们大致了解了kube-apiserver中的三个组件的核心组件GenericAPIServer是怎么将请求链接起来并移交请求的,以及内部的路由数据流,但我们还不了解怎么处理一些通用操作,比如认证,鉴权,审计等,以及k8s的api资源是怎么注册路由的,这两个问题我们放在后面的文章来说明。