type
Post
status
Published
date
May 1, 2023
slug
2023/kubelet
summary
Pod是如何创建的,以及为啥K8S跨节点通信非得要装插件咧?
tags
Kubernetes
云安全
category
技术分享
icon
password
目前分析的目的是为了明确Pod是如何创建的,以及为啥K8S跨节点通信非得要装插件
版本: v1.26.3 commit: 9e644106593f3f4aa98f8a84b23db5fa378900bd
服务启动调用栈
[k8s.io/kubernetes/cmd/kubelet]
• kubelet.go:35 -
command := app.NewKubeletCommand()
[k8s.io/kubernetes/cmd/kubelet/app]
• server.go:265 -
Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
• server.go:419 -
run(ctx, s, kubeDeps, featureGate)
• server.go:783 -
RunKubelet(s, kubeDeps, s.RunOnce)
插一下代码逻辑:与 containerd 通信看是否通
[k8s.io/kubernetes/pkg/kubelet/kubelet.go]
• kubelet.go:314 -
remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider)
[k8s.io/kubernetes/pkg/kubelet/cri/remote]
• remote_runtime.go:96 -
grpc.DialContext(ctx, addr, dialOpts...)
• server.go:1157 -
createAndInitKubelet(kubeServer,
....这部分代码主要是读取了 kubelet 启动命令的一些配置

如耳熟能详的未授权的
--anonymous-auth
攻击方法就在这部分初始化配置[k8s.io/kubernetes/pkg/kubelet/server]
• server.go:1216 -
kubelet.NewMainKubelet(&kubeServer.KubeletConfiguration,
...[k8s.io/kubernetes/pkg/kubelet]
• kubelet.go:328 -
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration
….[k8s.io/kubernetes/pkg/kubelet/server]
• server.go:1180 -
k.RunOnce(podCfg.Updates())
• server.go:1185 -
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
• server.go:1197 -
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
[k8s.io/kubernetes/pkg/kubelet]
• kubelet.go:2513 -
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp)
[k8s.io/kubernetes/pkg/kubelet/server]
• server.go:177 -
s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile)
• server.go:1200 -
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
• server.go:1203 -
go k.ListenAndServePodResources()
平常我们使用的,最终监听的是 http/https 服务,即 kubelet.go:328 → server.go:1185 → server.go:1197 → kubelet.go:2513 → server.go:177 这条路径

平常利用的 kubelet 可读端口信息泄漏则走 server.go:1200 监听可读端口
此时我们来到了 API 接口所在文件
api接口路径
[k8s.io/kubernetes/pkg/kubelet/server]
• server.go:279-288 -
InstallDefaultHandlers
server.InstallDebuggingDisabledHandlers()
等根据是否为debug模式,注册不通的 API接口未授权利用的 pod/run/exec 相关方法接口定义就在
• server.go:375
• server.go:288
s.addMetricsBucketMatcher("run") s.addMetricsBucketMatcher("exec") s.addMetricsBucketMatcher("attach") s.addMetricsBucketMatcher("portForward") s.addMetricsBucketMatcher("containerLogs") s.addMetricsBucketMatcher("runningpods") s.addMetricsBucketMatcher("pprof") s.addMetricsBucketMatcher("logs") paths := []string{ "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/", "/runningpods/", pprofBasePath, logsPath} for _, p := range paths { s.restfulCont.Handle(p, h) } .... ws := new(restful.WebService) ws. Path("/pods"). Produces(restful.MIME_JSON) ws.Route(ws.GET(""). To(s.getPods). Operation("getPods")) s.restfulCont.Add(ws) s.addMetricsBucketMatcher("stats") s.restfulCont.Add(stats.CreateHandlers(statsPath, s.host, s.resourceAnalyzer)) s.addMetricsBucketMatcher("metrics") s.addMetricsBucketMatcher("metrics/cadvisor") s.addMetricsBucketMatcher("metrics/probes") s.addMetricsBucketMatcher("metrics/resource")
Pod的启动
到这里我们明白了完整的 kubelet启动过程,因POD时通过 kubelet创建的,但经过这些调用栈,我们并没有看到带有 Pod 关键字的创建的接口
其实在
go k.ListenAndServe
上面,有个 go k.Run(podCfg.Updates())
步入在syncLoop()
方法中,kubelet会通过调用kl.containerRuntime.SyncPod
方法来同步Pod的状态。如果发现需要创建新的Pod,则会调用m.runtimeService.RunPodSandbox
方法进行处理。这个方法会依次执行以下步骤:- 根据Pod的定义创建并设置Pod的状态。
- 创建和启动Pod的网络命名空间和网络接口。
- 为每个容器创建并配置容器的文件系统、网络和存储卷等资源。
- 启动容器进程。
其中,第二步和第三步的具体实现依赖于不同的容器运行时
调用栈:
[k8s.io/kubernetes/pkg/kubelet]
• kubelet.go:1558 -
kl.syncLoop(ctx, updates, kl)
syncLoop is the main loop for processing changes. It watches for changes from three channels (file, apiserver, and http) and creates a union of them. For any new change seen, will run a sync against desired state and running state. If no changes are seen to the configuration, will synchronize the last known desired state every sync-frequency seconds. Never returns.
• kubelet.go:2147 -
kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
• kubelet.go:2204 -
case kubetypes.
ADD
:
handler.HandlePodAdditions(u.Pods)
• kubelet.go:2369 -
kl.dispatchWork(pod, kubetypes.
SyncPodCreate
, mirrorPod, start)
一开始这里跟错了,后面发现是 kubelet.go:2369
• kubelet.go:2344
[k8s.io/kubernetes/pkg/kubelet/pod]
• pod_manager.go:151
• pod_manager.go:157 跟到这就跟不下去了
• kubelet.go:2311 -
// dispatchWork starts the asynchronous sync of the pod in a pod worker. // If the pod has completed termination, dispatchWork will perform no action. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { // Run the sync in an async worker. kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, StartTime: start, }) // Note the number of containers for new pods. if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } }
• pod_workers.go:750 -
// Creating a new pod worker either means this is a new pod, or that the // kubelet just restarted. In either case the kubelet is willing to believe // the status of the pod for the first pod worker sync. See corresponding // comment in syncPod. go func() { defer runtime.HandleCrash() p.managePodLoop(outCh) }()
• pod_workers.go:950 -
p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
这个函数签名其实就是和 k8s.io/kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
对应的,就是下面这部分逻辑
[k8s.io/kubernetes/pkg/kubelet]
• kubelet.go:1712 - 如果没有网络插件,就只能在本地创建 Pod 了
// If the network plugin is not ready, only start the pod if it uses the host network if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) }
这里应该是创建Pod的地方了
• kubelet.go:1833 -
kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff)
[k8s.io/kubernetes/pkg/kubelet/kuberuntime]
• kuberuntime_manager.go:767 -
podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
• kuberuntime_sandbox.go:69 -
podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
[k8s.io/kubernetes/pkg/kubelet/cri/remote/remote_runtime.go]
• remote_runtime.go:170 -
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{ Config: config, RuntimeHandler: runtimeHandler, })
[k8s.io/kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1]
• api.pb.go:10189(api.pb.go)
func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) { out := new(RunPodSandboxResponse) err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...) if err != nil { return nil, err } return out, nil }
即 api.proto
rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
kubelet 最终通过 grpc 通信交给 containerd 去调用 containerd-shim 和 runc 去创建容器了
获取容器信息的3种方式,通过以下3种管道上下代码函数创建的协程定时等方式去获取容器变化信息,然后
syncLoop()
在循环时,发现管道信息有变化且带有有效信息进而执行创建Pod等操作k8s.io/kubernetes/pkg/kubelet/config/file_linux.go
• file_linux.go:76 -
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.
SET
, Source: kubetypes.
FileSource
}
k8s.io/kubernetes/pkg/kubelet/config/http.go
• http.go:105 -
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.
SET
, Source: kubetypes.
HTTPSource
}
k8s.io/kubernetes/pkg/kubelet/config/apiserver.go
• apiserver.go:64 -
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.
SET
, Source: kubetypes.
ApiserverSource
}