一只肥羊的思考

kubelet 的 Pod 信息源

背景简介

在之前的博文中,我们总结过,kubelet 整体是工作在「生产者与消费者」的模型之下的。

而其中生产者生产的消息就是有关「pod 发生变化」的通知,那么 kubelet 都有哪些消息生产者呢?

在这一篇博文中,我们主要关注的就是 kubelet 中的信息生产者,以及他们都生产了哪些消息

生产者介绍

首先我们可以再回顾一下 kubelet 中一段比较重要的函数 syncLoopIteration,这个是 kubelet 中最重要的循环函数,这个函数的内容就是在不停的通过 watch 所有消息生产者带来的消息,从而最终启动对应的 worker 函数去处理。

而在这个函数中我们就可以看到所有的消息生产者了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
return true
}

我们可以看到,一共有 5 在阻塞等待的 channel,其中 syncCh,housekeepCh,都是周期性触发的信号,属于被动的消息。而真正主动的消息生产者就是

  • configCh:这个 channel 是由 Dependencies 类型中 PodConfig 对象提供的 channel);该对象主要用于管理来自 3 个不同的信息源所发送的 pod 信息,apiserver,file,http,这也是 kubelet 中最为重要的一个 pod 事件源。这个事件源主要关注的是 Pod 静态信息(PodSpec)的变化。

  • plegCh:这个 channel 是由 Kubelet 对象中 pleg 子模块提供的 channel,这个子模块关注的是 Pod 动态信息的变化,比如 Container 的状态变化(已启动,已停止)。

  • update:这个 channel 是有 Kubelet 对象中的 livenessManager 子模块提供的 channel,这个子模块关注的是有关 Container 的 Liveness Probe 结果信息。

下面我们分别对这 3 个事件源的具体实现进行介绍。

PodConfig

在上一小节,我们简要介绍了,PodConfig 对象主要是监听来自 3 种不同来源的 pod 信息,进行汇总发送给 kubelet。而它所发送的消息格式是由 PodUpdate 这个类型来描述的

1
2
3
4
5
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}

kubelet 就是通过 watch PodConfig 对象传递给它的 PodUpdate 类型的 channel 来获取信息的

1
2
3
4
5
6
7
8
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
}

那么 PodConfig 自身又是如何工作的呢?

PodConfig 为每一个信息源(file,http,apiserver),分别创建了两个资源

  • 一个 channel: 用于接受该信息源发送过来的 pod 信息
  • 一个 本地缓存 map: 用于暂时缓存在这个机器上, 最近的来自这个源的所有 pod 的信息

每一个信息源,都会向 PodConfig 为它提供的 channel 中发送他获取的有关这个来源的 pod 信息。但是这些信息很可能会有重复的现象,所以 PodConfig 在获取到这些信息的时候,还会最终调用一个 Merge 函数,该函数会基于这个源的 pod 信息缓存来判断到底有哪些对 pod 的更新是真的需要关注的,以及到底是怎样的更新行为,比如创建,删除,更新等等。

举个栗子,apiserver 这个信息源对象,它是一个以 cache.NewUndeltaStore 为缓存的 Reflector 类型的对象,下面是它的初始化函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}

如果你不了解这个 cache.NewUndeltaStore 还有 Reflector 对象也没有关系,总之,这个 apiserver 的行为是这样的:无论是任何一个或者多个 pod 的创建,删除还是更新操作,都会导致这个对象把它当前得到所有和这个机器相关的 pod 信息都发送给 PodConfig 给他的 channel。自然这会导致大量的重复,所以 PodConfig 需要进行 Merge 过滤。

PodConfig 的 Merge 函数的具体细节如下

1
2
3
4
5
6
7
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
...
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
...
}

可见其中会调用一个 merge 函数,该函数作用就是针对不同的源发送过来的消息进行过滤筛选,筛选出哪些 pod 的新创建的,对应的就是返回值 adds;哪些是更新的,对应的就是返回 updates 等等。

而 merge 函数作为过滤器其中最重要的就是它如何判断当前信息的类型的子函数,就是 checkAndUpdatePod 子函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// checkAndUpdatePod updates existing, and:
// * if ref makes a meaningful change, returns needUpdate=true
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
// * else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
// 1. this is a reconcile
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if !podsDifferSemantically(existing, ref) {
// this is not an update
// Only check reconcile when it is not an update, because if the pod is going to
// be updated, an extra reconcile is unnecessary
if !reflect.DeepEqual(existing.Status, ref.Status) {
// Pod with changed pod status needs reconcile, because kubelet should
// be the source of truth of pod status.
existing.Status = ref.Status
needReconcile = true
}
return
}
...
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
needUpdate = true
}
}

从上面的函数体也可以看出来,kubelet 是如何定义这个 pod 是需要 update 的还是 reconcile 的,还是需要 delete 的。

最终 PodConfig 会把来自 3 个来源的信息经过过滤后都统一发送到一个 channel 里,也就是 kubelet 对象 watch 的 channel。

PLEG

在一开始的时候,我们也介绍过,PodConfig 关注的是 Pod 的静态信息的变化,而 PLEG 模块则关注的是有关 Pod 动态信息的变化了,主要指的就是 Container 的状态。

所以 PLEG 子模块工作逻辑也比较简单,就是一个周期性执行的 relist 函数,该函数的操作主要包括

  • 访问 container runtime 获取当前所有 Pod 的 Container 的最新状态信息
  • 根据缓存中记录的上一次 PLEG 获取的 Container 信息,进行比较,如果有变化则生成一个 PodLifecycleEvent 信息
  • 根据整理出来的所有有 PodLifecycleEvent 事件发生的 Pod,依次再从 container runtime 或者这个 pod 最新状态细节,更新到本地缓存中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (g *GenericPLEG) relist() {
...
// 访问 container runtime 获取当前所有 Pod 的 Container 的最新状态信息
podList, err := g.runtime.GetPods(true)
...
// 根据缓存中记录的上一次 PLEG 获取的 Container 信息,进行比较,如果有变化则生成一个 PodLifecycleEvent 信息
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
...
// 根据整理出来的所有有 PodLifecycleEvent 事件发生的 Pod,依次再从 container runtime 或者这个 pod 最新状态细节,更新到本地缓存中
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
...
if err := g.updateCache(pod, pid); err != nil {
glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
continue
}
}
...
}

所以这个消息源产生的消息类型为 PodLifecycleEvent,其中就包括了这个 Pod UID,以及发生的时间的类型,以及一些相关重要数据。

1
2
3
4
5
6
7
8
9
10
type PodLifecycleEvent struct {
// The pod ID.
ID types.UID
// The type of the event.
Type PodLifeCycleEventType
// The accompanied data which varies based on the event type.
// - ContainerStarted/ContainerStopped: the container name (string).
// - All other event types: unused.
Data interface{}
}

之所以 k8s 引入 PLEG 这种方案,就是为了能够尽可能减少 kubelet 对 container runtime 的 api 频繁调用,导致 CPU 利用率升高,kubelet 可扩展性降低。更多的具体细节可以参考 [1]

LivenessManager

livenessManager 是实现针对 container 进行 liveness probe 的子模块。Pod 中的每一个 container 都可以配置 Liveness Probe 特性,其中可以指定 3 种不同的 Probe 行为

  • http
  • tcp
  • exec

具体的使用方式可以参考[2]

而真实对 container 进行 liveness probe 的模块就是 kubelet 对象中的 livenessManager 以及 probeManager

其中 probeManager 对象会针对每一个配置了 liveness probe 或者 readiness probe 的 pod 分配一个 worker 对象,并且保存在一个 map 中,每个 worker 会启动一个 goroutine 对 container 进行指定的 probe 操作。

如下就是 probeManager 对象的类型 manager 的声明,

1
2
3
4
5
6
7
8
9
10
11
12
type manager struct {
// Map of active workers for probes
workers map[probeKey]*worker
...
// livenessManager manages the results of liveness probes
livenessManager results.Manager
// prober executes the probe actions.
prober *prober
}

其中比较重要的就是 map workers ,里面针对每一个需要 liveness probe 或者 readiness probe 的 container 都保存了一个 worker 对象。而这个 worker 对象会最终启动一个周期性的 probe goroutine,执行 liveness probe 的行为,并且把结果传输给 livenessManager 对象中。

而 livenessManager 对象所属的类型 result.Manager 其实也就是一个 channel,当 worker 进行 probe 的 goroutine 发现 liveness probe 有问题时,就会把这个消息传递给这个 channel,从而 kubelet 就可以 watch 到这个事件,从而进行异常处理。比如重启这个容器。

参考引用

  1. https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md
  2. https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/