一只肥羊的思考

kubelet创建Pod流程解析

简介

这篇文章主要是针对 kubelet 对 pod 的生命周期管理逻辑中,有关 pod 的创建的过程的解析。

就像我们在之前的文章中解释的那样,kubelet 整体是工作在生产者,消费者模型中的,所以当 一个 pod 创建的事件进入队列后,kubelet 就会及时收到这个事件,然后调用一系列相关的子模块,完成 pod 的创建工作。

流程解析

kubelet 针对 Pod 创建实现了一个 Handler 函数,HandlePodAdditions,该函数中包含pod 在创建的过程中的所有操作。我们就依次来看一下所有设计到的操作:

Step 1: 添加新创建的 Pod 到 podManager 子模块中

podManager 子模块负责管理这台机器上的 pod 的信息,pod 和 mirrorPod 之间的对应关系等等。

在创建 pod 时第一步操作就是把这个 Pod 的信息存入 podManager 中,供后续子模块查询使用

1
2
3
4
5
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
....
kl.podManager.AddPod(pod)
....
}

代码链接

Step 2: 将创建 Pod 的工作下发给 podWorkers 子模块

podWorkers 子模块 主要负责为每一个 Pod 启动,并且创建单独的 goroutine 去真实的对这个 Pod 完成相应的操作,比如创建,更新,删除等等。

而 HandlePodAdditions 中的对 dispatchWork 函数的调用就会真实的把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers,在这里我们关注的是 Pod 的创建动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
...
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
...
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
...
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
...
}

Step 3: podWorkers 子模块为该 Pod 启动一个处理对 Pod 的更新事件的 goroutine

podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和 更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。

所以 podWorker 创建针对每一个的 Pod 的【处理 goroutine】 还有【下发事件给处理 goroutine】的动作都是在 UpdatePod 函数中完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
...
if podUpdates, exists = p.podUpdates[uid]; !exists { // 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel
// 创建 channel
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 启动 goroutine
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] { // 下发更新事件
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
....
}

Step 4: 处理 pod 更新事件的 goroutine 开始操作

podWorkers 为每一个 pod 启动一个处理事件的 goroutine,goroutine 主要调用的是 managePodLoop 函数,这个函数内部将会利用 range 阻塞式的等待更新事件的到来,直到 channel 被关闭。

针对每一个事件 goroutine 会调用 kubelet 对象的 syncPodFn 函数真实的完成这次更新行为。

在完成这次 sync 动作之后,会调用 wrapUp 函数,这个函数将会做几件事情

  • 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync
  • 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
...
for update := range podUpdates {
...
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
...
p.wrapUp(update.Pod.UID, err)
}

在 managePodLoop 中调用的 syncPodFn 的实现其实是 kubelet 对象的 syncPod 方法

在这个方法中,主要完成几件事情

  • 首先根据从 container runtime 那边获取的当前 pod 所有容器的状态,以及 pod 的描述信息得出 pod 整体的状态,并且发送给 kubelet.statusManager。而 statusManager 的工作就是周期性的把 podStatus 同步到 apiserver 去。
  • 然后调用 kubelet.containerManager 的 NewPodContainerManager 方法,为这个 pod 创建一个 cgroup 的 manager,这个 manager 对象专门用户为这个 pod 创建,并且管理 pod 级别的 cgroup。所以紧接着,针对 pod 的创建事件,这个 pod 的 cgroup manager 主要做两件事情
    • 创建 pod level cgroup
    • 更新这台机器的 qos level cgroup
  • 然后为这个 pod 创建一系列必要的本地目录,主要用于存放 kubelet 的元数据,pod 内容器的挂载点等等。
  • 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。
  • 最终调用 kubelet.containerRuntime 组件,创建这个 pod 实体。containerRuntime 组件也是 kubelet 中用于负责同 container runtime 进行交互的组件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// 1. 同步 podStatus 到 kubelet.statusManager
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
...
kl.statusManager.SetPodStatus(pod, apiPodStatus)
...
// 2. 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
pcm := kl.containerManager.NewPodContainerManager()
...
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
...
if err := pcm.EnsureExists(pod); err != nil {
...
// 3. 为 pod 创建数据目录
if err := kl.makePodDataDirs(pod); err != nil {
...
// 4. 等待 kubelet.volumeManager 为 pod 完成必要的 volume 准备
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
// 5. 最终调用 kubelet.containerRuntime 的 SyncPod 方法,完成 pod 实体的创建
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
...
}

Step 5: 创建 Pod 实体

在上面一个步骤中,我们看到主要完成的都是创建 Pod 实体(即容器)之前需要完成的准备工作。在准备工作完成之后,最终调用 kubelet.containerRuntime 子模块的 SyncPod 函数真正完成 Pod 内容器实体的创建。

这个 SyncPod 方法的实现位于 kuberuntime 包中,其中主要完成几个步骤

  • 创建 PodSandbox 容器
  • 如果 Pod 中指定了 initContainers 的话,则创建 InitContainer 容器
  • 创建用户业务 Container
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
...
// 1. 创建 PodSandbox 容器
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
...
// 2. 启动 initContainer 容器
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
...
// 3. 启动业务容器
for _, idx := range podContainerChanges.ContainersToStart {
...
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
...
}

当所有一切动作都完成之后,一个完整的 Pod 就已经创建起来了!