一只肥羊的思考


  • Home

  • Archives

  • Tags
一只肥羊的思考

kubernetes volume 管理

Posted on 2018-08-26

什么是 Volume 特性

在 kubernetes 中,数据存储是一个非常关键的问题。用户的某些类型的服务经常会有对数据存储方面的一些需求,比如:

  • 数据需要在容器重启之后,甚至 pod 被删除后,仍旧被保留
  • 有些数据需要在同一个 pod 之间进行共享

此时就需要用到 kubernetes 的 volume 特性[1]了。

而 kubernetes 中的 volume 特性使用起来也是非常简单的,我们只需要在 pod template spec 中指定我们希望使用的 volume,以及这个 volume 希望被这个 pod 中的每一个 container 以什么样子的方式来使用就好了。

举个栗子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
apiVersion: v1
kind: Pod
metadata:
name: test-volume
spec:
containers:
- name: test-container1
image: busybox
volumeMounts:
- name: config-vol
mountPath: /etc/config
- name: test-container2
image: busybox
volumeMounts:
- name: config-vol
mountPath: /mnt/config
- name: data
mountPath: /data
volumes:
- name: config-vol
configMap:
name: log-config
- name: data
persistentVolumeClaim:
claimName: test-pvc

上面的这个 pod 其实就使用了两个 volume,分别叫做

  • config-vol
  • data

而这两个 volume 分别是用不同的方式提供的

  • config-vol 的数据是由一个叫做 log-config 的 configmap 来提供的
  • data 的数据则是由一个叫做 test-pvc 的 persistentVolumeClaim 对象来提供的

而这个 pod 中的两个 container 在使用两个 volume 时的使用姿势也是有差别的

  • test-container1 仅仅使用了 config-vol 这个 volume,并且挂载到了自己的 /etc/config 路径下面
  • test-container2 也使用了 config-vol 这个 volume,但是它挂载到了自己的 /mnt/config 路径下面;另外它也是用了 data 这个 volume,并且挂载到了 /data 目录下面。

所以总结来看 kubernetes 中的 volume 是为了满足 pod 的各种数据存储需求所提供的特性。

  • 每一个 pod 使用的 volume 背后可能由不同的存储方式来提供的,比如 configmap,emptyDir,persistentVolumeClaim 等等
  • pod 中的每一个 container 都可以用自己的方式来使用这个 pod 中的任意一个 volume

而至于这个 volume 中的数据的生命周期,则跟 volume 背后是由哪种存储方式提供强相关。

  • 但是可以保证一点的是,无论采用哪种存储方式提供,volume 机制都能够保证在 container 发生重启的时候数据不会丢失。

常见的 Volume 类型

在上一节中我们有一个非常重要的结论:

  • volume 中的数据的生命周期,则跟 volume 背后是由哪种存储方式提供强相关。

所以根据我们的业务场景的中对数据的持久性,读写性能等等的不同需求,我们也需要选择不同的 Volume 类型来满足

比如我们平时比较常见的 Volume 类型包括

  • configmap:常用于为业务提供启动配置文件,这种小文件读取的需求;特点是只读,并且生命周期独立于 pod,即 pod 消失 configmap 仍旧存在
  • emptyDir:常用于同一个 pod 中的多个 container 进行少量文件的数据共享;特点是可读写,但是读写性能有限(取决于 kubelet 业务数据盘背后采用什么存储),并且没有 quota 限制,生命周期和 pod 完全一致,pod 被销毁,emptyDir 也会被删除
  • hostPath: 常用于 pod 希望访问主机上的某一个路径的场景,特点是可读写,生命周期独立于 pod
  • cephfs/nfs/rbd: 常用于 pod 希望使用网络存储来对数据进行持久化的场景
  • persistentVolumeClaim[2]:最新的一种让 pod 申请 volume 存储的形式,pod 可以通过创建一个 persistentVolumeClaim 对象来指明自己对这块 volume 存储的需求,比如大小,存储类型(比如 nfs,ceph rbd),生命周期完全独立于 pod。

PersistentVolumeClaim 机制

在上小节我们提到的 volume 的类型很丰富,包括 nfs,ceph rbd,cephfs,awsElasticBlockStore 等等,这些类型在 persistentVolumeClaim 概念出现之前是可以直接在 pod volume 字段中直接写明对这种 volume 的需求的,比如 cephfs,如果不采用 persistentVolumeClaim 机制的话,你需要在 volume 的字段中显式的写入跟这个 cephfs 的 volume 相关的配置,比如 monitors 的地址等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: v1
kind: Pod
metadata:
name: cephfs
spec:
containers:
- name: cephfs-rw
image: kubernetes/pause
volumeMounts:
- mountPath: "/mnt/cephfs"
name: cephfs
volumes:
- name: cephfs
cephfs:
monitors:
- 10.16.154.78:6789
- 10.16.154.82:6789
- 10.16.154.83:6789
# by default the path is /, but you can override and mount a specific path of the filesystem by using the path attribute
# path: /some/path/in/side/cephfs
user: admin
secretFile: "/etc/ceph/admin.secret"
readOnly: true

但是引入 persistentVolumeClaim 机制后,这些跟存储类型相关的全局配置信息都可以被封装到一个叫做 storageClass[3] 的对象的描述中,比如针对上面的例子,我们可以创建一个叫做 cephfs 的 storageClass 类型

1
2
3
4
5
6
7
8
9
10
11
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: cephfs
parameters:
adminId: admin
adminSecretName: ceph-admin-secret
adminSecretNamespace: kube-system
monitors: 192.168.213.25:6789,192.168.213.27:6789,192.168.213.28:6789
provisioner: ceph.com/cephfs
reclaimPolicy: Delete

此时,如果用户还是想要申请一块 cephfs 类型的 volume 来使用的话,它只需要创建一个 persistentVolumeClaim 对象即可,并且该对象中应该写明,希望的 storageClass 为 cephfs,并且还有一些其他的配置要求,比如大小,读写模式等等

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: cephfs-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Ti
storageClassName: cephfs

此时,这个 pod 的 volume 就可以修改一下改为使用这个 persistentVolumeClaim

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: v1
kind: Pod
metadata:
name: cephfs
spec:
containers:
- name: cephfs-rw
image: kubernetes/pause
volumeMounts:
- mountPath: "/mnt/cephfs"
name: cephfs
volumes:
- name: cephfs
persistentVolumeClaim:
claimName: cephfs-pvc

可见采用 persistentVolumeClaim 的好处有很多,不仅让接口更加清晰,也让用户对存储资源的管理更加方便。

persistentVolumeClaim 管理逻辑

persistentVolumeClaim 机制是现在比较常见的存储使用方式,对于它的理解将非常有助于我们解决对使用 pvc 的 pod 在启动,停止过程中遇到的各种问题。

那么当我们为一个 pod 创建了一个 pvc ,并且把这个 pod 和这个 pvc 绑定之后都发生了什么事情呢?都有哪些组件参与到了这个过程中呢?

我们在这一篇博文中仅粗略介绍一下这个 pvc 被申请,被初始化,再到被 pod 所使用的整个流程,并且每一个步骤我们还会通过单独的博文从代码的角度来分析具体的工作逻辑。并且我是针对我们公司使用的一个具体场景来进行介绍(ceph rbd with external provisioner)

整个 pvc 管理周期主要分为以下几个

  • Provisioning
  • Binding
  • Using

Provisioning

当 PVC 被创建后,第一步就是 Provisioning,PersistentVolumeClaim 从字面上来看也可以看出来,它只是一个对存储资源的需求的声明,针对这个声明,kubernetes 需要创造一个相应的“实体”,才能满足这个声明的需求。那么在 kubernetes 中,这个实体就是 PersistentVolume 对象。

所以如果想要 PersistentVolumeClaim 声明的存储需求被满足,我们就必须有能够满足需求的 PersistentVolume 存在,那么这个 PersistentVolume 是如何被创建出来的呢?

PersistentVolume 有两种途径被创建

  • static provisioning: 顾名思义,需要集群管理员,在 PersistentVolumeClaim 还没创建好的时候就要手动创建好一些 PersistentVolume

  • dynamic provisioning: 顾名思义,这种场景下,会有一个组件,去动态获取新创建的 PersistentVolumeClaim 信息,来直接动态创建和这个 pvc 需求相符的 PersistentVolume

其中 dynamic provisioning 是一种更加灵活的方式,社区中已经针对不同的存储技术开发了不同的组件来完成这个 dynamic provisioning 的流程,详情可以参考 https://github.com/kubernetes-incubator/external-storage

Binding

在 PersistentVolumeClaim 被创建后,而且也有了被创建好的 PersistentVolume 之后,就要开启 Binding 的步骤。很显然就是要为 PersistentVolumeClaim 选取最为合适的 PersistentVolume。PersistentVolumeClaim 和 PersistentVolume 是一对一的关系,不允许有一对多的关系存在。

所以当一个 PersistentVolumeClaim 申请了 100G 的容量时,但是目前系统中仅仅存在 200G 的 PersistentVolume 时,k8s 也会把二者 bind 起来

当然如果 PersistentVolume 是 dynamic provision 的方式创建出来的话,那么这个 PersistentVolumeClaim 和 PersistentVolume 之间的对应关系就已经被建立好了,无需再进行选择。

这个部分的工作使用 kube-controller-manager 中的 PvController 组件来完成的。

另外在 bind 之后,kube-controller-manager 中还存在一个 AttachDetachController,这个 controller 的工作是当某个 pvc 所对应的 pod 被调度到某台机器上面之后,AttachDetachController 会在逻辑上先把这个 PersistentVolume 和这个机器 attach 到一起,保证那些不支持多个 pod 共享的 pv 类型不会被同时两个 pod 所使用。

Using

最后一步就是 using,也就是说,当某一个使用这个 pvc 的 pod 被调度到某一台机器上以后,每台机器上面的 kubelet 会最终完成对这个 PersistentVolume 的实体化初始化,并且最终 mount 到这个 pod 的容器的指定目录里面去。

当然这个过程还有很多细节,比如 kubelet 会首先等待 AttachDetachController 把这个 PV 和所在的机器 attach 完成之后再进行后续操作,另外 PV 对应的 volume 会被首先 mount 到一个全局路径下,然后再 remount 到每一个 pod 的某个路径下。这样做是因为有些 PV 是可以支持被多个 Pod 所同时使用的,等等。这一系列的动作都是有 kubelet 中的 volumeManager 来完成的。

在后面的博文中,我们会针对 volume 整个生命周期中涉及到的所有组件进行一一解析,为大家还原 kubernetes 对 volume 的管理逻辑。

参考引用

  1. https://kubernetes.io/docs/concepts/storage/volumes
  2. https://kubernetes.io/docs/concepts/storage/persistent-volumes/
  3. https://kubernetes.io/docs/concepts/storage/storage-classes
一只肥羊的思考

Go 项目单元测试最佳实践

Posted on 2018-08-19

Go 语言项目中,单元测试是必不可少的,就像我们在这篇博文中介绍的那样,单元测试占所有测试代码的 70% 的比重才算合理,可见单元测试的重要性。

这篇博文中,主要记录了一些常见的 Go 语言单元测试的最佳实践,当然这是一个可能后续会不断丰富完善的文档。

Table Driven Test

table driven test [1][2] 是很多 Go 语言开发者所推崇的测试代码编写方式,Go 语言标准库的测试也是通过这个结构来撰写的,比如 time 库的测试。

整体来讲,table driven test 的基本结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var testcases = []struct {
in string
out string
}{
{"in1", "out1"},
{"in2", "out2"},
}
func TestAFunc(t *testing.T) {
for _, tt := range testcases {
s := A(tt.in)
if s != tt.out {
t.Errorf("got %q, want %q", s, tt.out)
}
}
}

其中可见我们通过匿名结构体构建了每一个测试用例的结构,一个输入 in 和一个我们期望的输出 out,然后在真实的测试函数中,通过 range 轮询每一个测试用例,并且调用测试函数,比较输出结果,如果输出结果不等于我们期望的结果,即报错。

这种测试框架最好的一点在于,结构清晰,并且添加新的测试 case 会非常方便,所以也是最为推荐的一种测试的编写方式。

Test Fixtures, Golden Files

在某些场景下,我们的测试可能需要一些除了测试源码以外的额外的文件,比如我们在测试对 json 文件的解码功能时,就需要一些示例的 json 文件作为测试 case 的输入。

像这种场景,把这些测试过程中用到的辅助文件,通常就叫做 test fixtures[3][4][5]。而放置这些文件最佳的路径,就是测试代码所在路径的一个名字为 testdata 的子目录下面,原因有二

  • 测试代码运行时,他的 working dir 就是测试代码的当前路径
  • go 语言在编译时会忽略名字叫做 testdata 的子目录

所以二者结合,我们应该把这些文件放在这样的路径下

而 Golden Files 又是什么呢?Golden Files 其实就是 test fixtures 中的一种,当测试用例的输出结果比较简单的时候,我们还可以把输出结果写在测试代码中 。但是当输出结果比较复杂时,直接写入代码已经不太合适了。所以此时,我们通常会把正确结果写入到文件里面,并且测试代码运行时,需要读取这个文件的内容进行比较。

一般 Golden File 的使用都会配合 Table Driven Test,每一个测试 case 的 Golden File 的名字一般就会以“这个 case 的名字+.golden” 来命名,这样在编写代码时也会比较简单。

标准库就包含这种使用方法的很多实例,比如对 gofmt 的测试 case:https://golang.org/src/cmd/gofmt/gofmt_test.go

*_test 包

单元测试文件一般被放置在待测试文件的同级目录下,而且文件名称基本和待测试文件名称加上”_test.go” 的规则来进行命名。

那么单测代码应该放在那个 package 中呢?一般没有特殊的需求的话,单元测试应该仅仅关注待测试的 package 的 exported 方法的测试。因为这些 exported 方法是这个 package 对外进行交互的唯一接口,所以必须要保证它的一致性。

所以一般也会将测试代码放置到另一个 package 中,假设待测试的 package 名称叫做 fmt,那么它的单测代码一般所在的包名为 fmt_test。

但是如果你真的需要对一些逻辑比较复杂的非 exported 函数进行测试的话,测试代码一定必须要和待测试的 package 在同一个 package 中。所以此时的最佳实践一般是再构建一个文件,名称为 _internal_test.go。此时这些测试代码的 package 是和这个package 位于同一个 package 中。

所以一个完整的测试代码目录结构应该如下

1
2
3
4
-- gofmt
|-- gofmt.go (package gofmt)
|-- gofmt_internal_test.go (package gofmt)
|-- gofmt_test.go (package gofmt_test)

TBD

  • Mock 技术
  • Http Server 测试
  • 并发测试

Reference

  1. https://dave.cheney.net/2013/06/09/writing-table-driven-tests-in-go
  2. https://github.com/golang/go/wiki/TableDrivenTests
  3. https://speakerdeck.com/mitchellh/advanced-testing-with-go?slide=22
  4. https://medium.com/soon-london/testing-with-golden-files-in-go-7fccc71c43d3
  5. https://dave.cheney.net/2016/05/10/test-fixtures-in-go
  6. https://medium.com/@benbjohnson/structuring-tests-in-go-46ddee7a25c
  7. https://talks.golang.org/2014/testing.slide#15
一只肥羊的思考

Go 语言项目测试最佳实践

Posted on 2018-08-12

自动化测试对于一个产品的快速迭代是非常关键的一环,这篇文章主要关注的就是有关自动化测试方面的一些最佳实践,已经在 Go 语言项目中有关测试代码上的一些具体的最佳实践。

测试包含哪几种

常见的测试类型主要包括

  • 单元测试
  • 集成测试
  • E2E 测试

几种,不同的测试具备不同的特质,也适用于不同的目的。

首先,「单元测试」根据定义指的是针对代码中的某个独立单元,(比如一个类,一个方法)的测试方法。可见这种测试的优势就在于

  • 编写简单
  • 定位错误精准

但是缺陷也显而易见

  • 测试范围有限

再看「集成测试」,集成测试要比单元测试的范围更大一点,一般涉及到多个模块之间的配合测试,自然而然,他的编写难度也要比单元测试难一些。但是他可以很好的快速发现多个模块单独测试没有问题,但是一起协同工作出现问题的场景。

另外 kubernetes 社区对于集成测试的约束也很有意思,他认为集成测试还应该满足所有测试需要的服务都应该在一台机器上启动。

最后「E2E 测试」,E2E 测试则更多的是模拟用户的行为对整个产品进行整体测试,比如开发一款 app,E2E 测试可能会模拟一个用户,注册,登录,点击产品等等一系列行为。所以这种测试方法的优势在于

  • 快速确认产品是否存在 bug,并且 bug 是真实用户会感知的

虽然能够确认有 bug ,但是由于 bug 可能是 E2E 测试中涉及到的多个组件中的任何一个,所以它的缺陷也很明显

  • 无法快速定位问题代码
  • 编写难度也是最大的

所以也正是基于上述不同测试的优缺点,Google 也总结出针对这几种测试的比例的最佳实践。

As a good first guess, Google often suggests a 70/20/10 split: 70% unit tests, 20% integration tests, and 10% end-to-end tests.

也就是说一个稳定的项目,它的单元测试,集成测试,E2E 测试的数量应该以 7:2:1 的比例分布,形成一个金字塔的形状,塔底是大量的单元测试,塔顶是少量的 E2E 测试。

下面我们也将通过单独的博文分别介绍,单元测试,集成测试,E2E 测试各自的最佳实践。

一只肥羊的思考

kubelet 的 Pod 信息源

Posted on 2018-08-12

背景简介

在之前的博文中,我们总结过,kubelet 整体是工作在「生产者与消费者」的模型之下的。

而其中生产者生产的消息就是有关「pod 发生变化」的通知,那么 kubelet 都有哪些消息生产者呢?

在这一篇博文中,我们主要关注的就是 kubelet 中的信息生产者,以及他们都生产了哪些消息

生产者介绍

首先我们可以再回顾一下 kubelet 中一段比较重要的函数 syncLoopIteration,这个是 kubelet 中最重要的循环函数,这个函数的内容就是在不停的通过 watch 所有消息生产者带来的消息,从而最终启动对应的 worker 函数去处理。

而在这个函数中我们就可以看到所有的消息生产者了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
return true
}

我们可以看到,一共有 5 在阻塞等待的 channel,其中 syncCh,housekeepCh,都是周期性触发的信号,属于被动的消息。而真正主动的消息生产者就是

  • configCh:这个 channel 是由 Dependencies 类型中 PodConfig 对象提供的 channel);该对象主要用于管理来自 3 个不同的信息源所发送的 pod 信息,apiserver,file,http,这也是 kubelet 中最为重要的一个 pod 事件源。这个事件源主要关注的是 Pod 静态信息(PodSpec)的变化。

  • plegCh:这个 channel 是由 Kubelet 对象中 pleg 子模块提供的 channel,这个子模块关注的是 Pod 动态信息的变化,比如 Container 的状态变化(已启动,已停止)。

  • update:这个 channel 是有 Kubelet 对象中的 livenessManager 子模块提供的 channel,这个子模块关注的是有关 Container 的 Liveness Probe 结果信息。

下面我们分别对这 3 个事件源的具体实现进行介绍。

PodConfig

在上一小节,我们简要介绍了,PodConfig 对象主要是监听来自 3 种不同来源的 pod 信息,进行汇总发送给 kubelet。而它所发送的消息格式是由 PodUpdate 这个类型来描述的

1
2
3
4
5
type PodUpdate struct {
Pods []*v1.Pod
Op PodOperation
Source string
}

kubelet 就是通过 watch PodConfig 对象传递给它的 PodUpdate 类型的 channel 来获取信息的

1
2
3
4
5
6
7
8
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
...
}

那么 PodConfig 自身又是如何工作的呢?

PodConfig 为每一个信息源(file,http,apiserver),分别创建了两个资源

  • 一个 channel: 用于接受该信息源发送过来的 pod 信息
  • 一个 本地缓存 map: 用于暂时缓存在这个机器上, 最近的来自这个源的所有 pod 的信息

每一个信息源,都会向 PodConfig 为它提供的 channel 中发送他获取的有关这个来源的 pod 信息。但是这些信息很可能会有重复的现象,所以 PodConfig 在获取到这些信息的时候,还会最终调用一个 Merge 函数,该函数会基于这个源的 pod 信息缓存来判断到底有哪些对 pod 的更新是真的需要关注的,以及到底是怎样的更新行为,比如创建,删除,更新等等。

举个栗子,apiserver 这个信息源对象,它是一个以 cache.NewUndeltaStore 为缓存的 Reflector 类型的对象,下面是它的初始化函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}

如果你不了解这个 cache.NewUndeltaStore 还有 Reflector 对象也没有关系,总之,这个 apiserver 的行为是这样的:无论是任何一个或者多个 pod 的创建,删除还是更新操作,都会导致这个对象把它当前得到所有和这个机器相关的 pod 信息都发送给 PodConfig 给他的 channel。自然这会导致大量的重复,所以 PodConfig 需要进行 Merge 过滤。

PodConfig 的 Merge 函数的具体细节如下

1
2
3
4
5
6
7
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
...
adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
...
}

可见其中会调用一个 merge 函数,该函数作用就是针对不同的源发送过来的消息进行过滤筛选,筛选出哪些 pod 的新创建的,对应的就是返回值 adds;哪些是更新的,对应的就是返回 updates 等等。

而 merge 函数作为过滤器其中最重要的就是它如何判断当前信息的类型的子函数,就是 checkAndUpdatePod 子函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// checkAndUpdatePod updates existing, and:
// * if ref makes a meaningful change, returns needUpdate=true
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
// * else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
// 1. this is a reconcile
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if !podsDifferSemantically(existing, ref) {
// this is not an update
// Only check reconcile when it is not an update, because if the pod is going to
// be updated, an extra reconcile is unnecessary
if !reflect.DeepEqual(existing.Status, ref.Status) {
// Pod with changed pod status needs reconcile, because kubelet should
// be the source of truth of pod status.
existing.Status = ref.Status
needReconcile = true
}
return
}
...
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
needUpdate = true
}
}

从上面的函数体也可以看出来,kubelet 是如何定义这个 pod 是需要 update 的还是 reconcile 的,还是需要 delete 的。

最终 PodConfig 会把来自 3 个来源的信息经过过滤后都统一发送到一个 channel 里,也就是 kubelet 对象 watch 的 channel。

PLEG

在一开始的时候,我们也介绍过,PodConfig 关注的是 Pod 的静态信息的变化,而 PLEG 模块则关注的是有关 Pod 动态信息的变化了,主要指的就是 Container 的状态。

所以 PLEG 子模块工作逻辑也比较简单,就是一个周期性执行的 relist 函数,该函数的操作主要包括

  • 访问 container runtime 获取当前所有 Pod 的 Container 的最新状态信息
  • 根据缓存中记录的上一次 PLEG 获取的 Container 信息,进行比较,如果有变化则生成一个 PodLifecycleEvent 信息
  • 根据整理出来的所有有 PodLifecycleEvent 事件发生的 Pod,依次再从 container runtime 或者这个 pod 最新状态细节,更新到本地缓存中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (g *GenericPLEG) relist() {
...
// 访问 container runtime 获取当前所有 Pod 的 Container 的最新状态信息
podList, err := g.runtime.GetPods(true)
...
// 根据缓存中记录的上一次 PLEG 获取的 Container 信息,进行比较,如果有变化则生成一个 PodLifecycleEvent 信息
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
...
// 根据整理出来的所有有 PodLifecycleEvent 事件发生的 Pod,依次再从 container runtime 或者这个 pod 最新状态细节,更新到本地缓存中
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
...
if err := g.updateCache(pod, pid); err != nil {
glog.Errorf("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
continue
}
}
...
}

所以这个消息源产生的消息类型为 PodLifecycleEvent,其中就包括了这个 Pod UID,以及发生的时间的类型,以及一些相关重要数据。

1
2
3
4
5
6
7
8
9
10
type PodLifecycleEvent struct {
// The pod ID.
ID types.UID
// The type of the event.
Type PodLifeCycleEventType
// The accompanied data which varies based on the event type.
// - ContainerStarted/ContainerStopped: the container name (string).
// - All other event types: unused.
Data interface{}
}

之所以 k8s 引入 PLEG 这种方案,就是为了能够尽可能减少 kubelet 对 container runtime 的 api 频繁调用,导致 CPU 利用率升高,kubelet 可扩展性降低。更多的具体细节可以参考 [1]

LivenessManager

livenessManager 是实现针对 container 进行 liveness probe 的子模块。Pod 中的每一个 container 都可以配置 Liveness Probe 特性,其中可以指定 3 种不同的 Probe 行为

  • http
  • tcp
  • exec

具体的使用方式可以参考[2]

而真实对 container 进行 liveness probe 的模块就是 kubelet 对象中的 livenessManager 以及 probeManager

其中 probeManager 对象会针对每一个配置了 liveness probe 或者 readiness probe 的 pod 分配一个 worker 对象,并且保存在一个 map 中,每个 worker 会启动一个 goroutine 对 container 进行指定的 probe 操作。

如下就是 probeManager 对象的类型 manager 的声明,

1
2
3
4
5
6
7
8
9
10
11
12
type manager struct {
// Map of active workers for probes
workers map[probeKey]*worker
...
// livenessManager manages the results of liveness probes
livenessManager results.Manager
// prober executes the probe actions.
prober *prober
}

其中比较重要的就是 map workers ,里面针对每一个需要 liveness probe 或者 readiness probe 的 container 都保存了一个 worker 对象。而这个 worker 对象会最终启动一个周期性的 probe goroutine,执行 liveness probe 的行为,并且把结果传输给 livenessManager 对象中。

而 livenessManager 对象所属的类型 result.Manager 其实也就是一个 channel,当 worker 进行 probe 的 goroutine 发现 liveness probe 有问题时,就会把这个消息传递给这个 channel,从而 kubelet 就可以 watch 到这个事件,从而进行异常处理。比如重启这个容器。

参考引用

  1. https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md
  2. https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
一只肥羊的思考

kubelet创建Pod流程解析

Posted on 2018-07-22

简介

这篇文章主要是针对 kubelet 对 pod 的生命周期管理逻辑中,有关 pod 的创建的过程的解析。

就像我们在之前的文章中解释的那样,kubelet 整体是工作在生产者,消费者模型中的,所以当 一个 pod 创建的事件进入队列后,kubelet 就会及时收到这个事件,然后调用一系列相关的子模块,完成 pod 的创建工作。

流程解析

kubelet 针对 Pod 创建实现了一个 Handler 函数,HandlePodAdditions,该函数中包含pod 在创建的过程中的所有操作。我们就依次来看一下所有设计到的操作:

Step 1: 添加新创建的 Pod 到 podManager 子模块中

podManager 子模块负责管理这台机器上的 pod 的信息,pod 和 mirrorPod 之间的对应关系等等。

在创建 pod 时第一步操作就是把这个 Pod 的信息存入 podManager 中,供后续子模块查询使用

1
2
3
4
5
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
....
kl.podManager.AddPod(pod)
....
}

代码链接

Step 2: 将创建 Pod 的工作下发给 podWorkers 子模块

podWorkers 子模块 主要负责为每一个 Pod 启动,并且创建单独的 goroutine 去真实的对这个 Pod 完成相应的操作,比如创建,更新,删除等等。

而 HandlePodAdditions 中的对 dispatchWork 函数的调用就会真实的把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers,在这里我们关注的是 Pod 的创建动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
...
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
...
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
...
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
...
}

Step 3: podWorkers 子模块为该 Pod 启动一个处理对 Pod 的更新事件的 goroutine

podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和 更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。

所以 podWorker 创建针对每一个的 Pod 的【处理 goroutine】 还有【下发事件给处理 goroutine】的动作都是在 UpdatePod 函数中完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
...
if podUpdates, exists = p.podUpdates[uid]; !exists { // 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel
// 创建 channel
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 启动 goroutine
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
if !p.isWorking[pod.UID] { // 下发更新事件
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
....
}

Step 4: 处理 pod 更新事件的 goroutine 开始操作

podWorkers 为每一个 pod 启动一个处理事件的 goroutine,goroutine 主要调用的是 managePodLoop 函数,这个函数内部将会利用 range 阻塞式的等待更新事件的到来,直到 channel 被关闭。

针对每一个事件 goroutine 会调用 kubelet 对象的 syncPodFn 函数真实的完成这次更新行为。

在完成这次 sync 动作之后,会调用 wrapUp 函数,这个函数将会做几件事情

  • 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync
  • 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
...
for update := range podUpdates {
...
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
...
p.wrapUp(update.Pod.UID, err)
}

在 managePodLoop 中调用的 syncPodFn 的实现其实是 kubelet 对象的 syncPod 方法

在这个方法中,主要完成几件事情

  • 首先根据从 container runtime 那边获取的当前 pod 所有容器的状态,以及 pod 的描述信息得出 pod 整体的状态,并且发送给 kubelet.statusManager。而 statusManager 的工作就是周期性的把 podStatus 同步到 apiserver 去。
  • 然后调用 kubelet.containerManager 的 NewPodContainerManager 方法,为这个 pod 创建一个 cgroup 的 manager,这个 manager 对象专门用户为这个 pod 创建,并且管理 pod 级别的 cgroup。所以紧接着,针对 pod 的创建事件,这个 pod 的 cgroup manager 主要做两件事情
    • 创建 pod level cgroup
    • 更新这台机器的 qos level cgroup
  • 然后为这个 pod 创建一系列必要的本地目录,主要用于存放 kubelet 的元数据,pod 内容器的挂载点等等。
  • 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。
  • 最终调用 kubelet.containerRuntime 组件,创建这个 pod 实体。containerRuntime 组件也是 kubelet 中用于负责同 container runtime 进行交互的组件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// 1. 同步 podStatus 到 kubelet.statusManager
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
...
kl.statusManager.SetPodStatus(pod, apiPodStatus)
...
// 2. 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
pcm := kl.containerManager.NewPodContainerManager()
...
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
...
if err := pcm.EnsureExists(pod); err != nil {
...
// 3. 为 pod 创建数据目录
if err := kl.makePodDataDirs(pod); err != nil {
...
// 4. 等待 kubelet.volumeManager 为 pod 完成必要的 volume 准备
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
// 5. 最终调用 kubelet.containerRuntime 的 SyncPod 方法,完成 pod 实体的创建
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
...
}

Step 5: 创建 Pod 实体

在上面一个步骤中,我们看到主要完成的都是创建 Pod 实体(即容器)之前需要完成的准备工作。在准备工作完成之后,最终调用 kubelet.containerRuntime 子模块的 SyncPod 函数真正完成 Pod 内容器实体的创建。

这个 SyncPod 方法的实现位于 kuberuntime 包中,其中主要完成几个步骤

  • 创建 PodSandbox 容器
  • 如果 Pod 中指定了 initContainers 的话,则创建 InitContainer 容器
  • 创建用户业务 Container
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
...
// 1. 创建 PodSandbox 容器
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
...
// 2. 启动 initContainer 容器
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
...
// 3. 启动业务容器
for _, idx := range podContainerChanges.ContainersToStart {
...
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
...
}

当所有一切动作都完成之后,一个完整的 Pod 就已经创建起来了!

一只肥羊的思考

kubelet内部实现解析

Posted on 2018-07-08

简介

kubelet 组件作为每台 kubernetes 集群中每台计算节点上运行的 agent,主要职责有两个

  • 管理这台机器上面的 pod 的生命周期,
  • 自动上报,并且维护这台计算节点的状态

这篇文章的目的就在于对 kubelet 整体的设计框架进行分析。并不会对任意一个子功能的细节进行深入的介绍。后续会有一系列文章对 kubelet 组件的各种功能进行详细的介绍。

kubelet 内部结构

kubelet 内部其实是由多个【子模块】来构成的,每个子模块都单独负责一部分的任务,而在代码中,所有的子模块对象都包含在下面两个对象中,这两个对象也是 kubelet 中最重要的两个对象

  • kubeDeps:该对象(类型为 Dependencies)主要包含一些 kubelet 依赖的外部功能,比如 cadvisor(监控功能),containerManager(cgroup 管理功能)。

  • kubelet:kubelet 对象(类型为 Kubelet)则代表 kubelet 内部跟 pod 息息相关的子模块,比如 podManager(pod 信息存储模块),probeManager(pod 测活模块)等等。

那么 kubelet 中各个子模块之间又是如何配合工作的呢?主要是基于生产者消费者的模型。

整个 kubelet 的工作模式就是在围绕着不同的生产者生产出来的不同的有关 pod 的消息来调用相应的消费者(不同的子模块)完成不同的行为,比如创建 pod,删除 pod,如下图所示

那么 kubelet 中主要包含哪几个消息的生产者呢?消费者又是怎么消费的呢?

我们可以在 kubelet 的 syncLoopIteration 函数中看到 kubelet 到底同时接收哪几个信息源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(...) {
case u, open := <-configCh:
...
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
}

通过代码注释可以看出,kubelet 主要有 5 个不同的信息源

  • configCh: 该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。
  • plegCh: 该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化,则这个 channel 产生事件。[1]
  • syncCh: 该信息源是一个周期性的信号源(默认1秒),周期性同步所有需要再次同步的 pod。
  • liveness manager update: 该信息源是由 kubelet 对象中 livenessManager管理,当某个容器的 liveness probe 状态发生了变化,则会产生事件。
  • housekeepingCh: 该信息源也是一个周期性信号源(默认2秒),周期性的清理一些无用 pod。

所有的这些消息源产生的消息都由 kubelet 对象统一接受,并且调用相应的功能函数来完成相应的操作。

kubelet 对象自身实现一系列处理不同事件的 handler 函数,并且汇总成 SyncHandler 接口,其中包含针对不同信息源里不同消息类型的处理函数

1
2
3
4
5
6
7
8
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}

当然,每一个处理函数背后可能都需要 kubelet 对象去调用背后多个内部子模块来共同完成,比如 HandlePodAddition 函数,处理 Pod 的创建,其中可能需要

  • 调用 kubelet.podManager 子模块 AddPod 函数,注册该 pod 信息
  • 调用 kubelet.podWorker 子模块为这个 Pod 创建单独的 worker goroutine 完成具体的操作
  • 调用 kubelet.containerManager 子模块为这个 Pod 创建相应的 Pod Level Cgroup
  • 调用 kubelet.volumeManager 子模块为这个 Pod 准备需要被 Mount 到容器中的文件系统
  • 调用 kubelet.containerRuntime 子模块真正的创建 Pod 的实体
  • ….

所以综上,整个 kubelet 的所有内部子模块就是通过这种生产者消费者模型协调工作,及时将 Pod 以用户期望的状态维护在它所在的机器上。

上面说到的只是 kubelet 中和 pod 管理相关的结构,kubelet 中还包括一些为了

  • 维护物理机稳定性
  • 同步更新物理机配置

等目的,周期性不间断工作的子模块,他们也是 kubelet 中非常重要的一部分。

所以这篇文章作为一个综述,是一系列文章的开端,我将通过一系列博文来解析 kubelet 的内部结构的细节,针对 kubelet 每一个比较重要的子模块,子功能进行单独的介绍。

参考引用

  1. https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md
Ziqi Zhao (fatsheep9146)

Ziqi Zhao (fatsheep9146)

6 posts
3 tags
© 2018 Ziqi Zhao (fatsheep9146)
Powered by Hexo
Theme - NexT.Muse