containerd源码阅读(1)--框架篇
Overview
1. 简介
containerd是与docker直接沟通的下属组件,详细是什么不说了。 每个docker daemon启动的时候都会启动一个containerd daemon,启动容器的时候, 每个容器的init进程/exec进程都会对应一个containerd-shim进程, containerd-shim同样是containerd库里面单独的一个二进制程序, containerd-shim会调用runc最终启动容器。 这些基本的知识一笔带过不详细展开。
随着docker改名为moby,docker的大部分功能,比如image管理,容器运行都会下沉到containerd, docker会越来越侧重于编排调度部分--swarm。
简单分析下containerd的代码架构,作为官方containerd文档的一个补充。
本篇以git commit d700a9c35b09239c8c056cd5df73bc19a79db9a9
为标准讲解。
2. grpc
containerd的架构极其依赖grpc协议。
1# ls api/services/
2containers content diff events images namespaces snapshot tasks version
api/services
目录下存放着containerd提供的不同服务对应的grpc接口。
以比较基础的content服务为例,
1# ls api/services/content/v1/
2content.pb.go content.proto
下面一共有两个文件,一个content.proto一个是.pb.go, 其中用户只需要定义content.proto文件,
而程序最终使用的content.pb.go则可以由grpc命令自动生成。
containerd在Makefile中提供了生成.pb.go
的指令
1// 安装依赖的库
2# cd containerd && make setup
3# make protos
打开content.proto 来看,里面主要定义了一个service:
113 service Content {
2 14 // Info returns information about a committed object.
3 15 //
4 16 // This call can be used for getting the size of content and checking for
5 17 // existence.
6 18 rpc Info(InfoRequest) returns (InfoResponse);
7 19
8 20 // Update updates content metadata.
9 21 //
10 22 // This call can be used to manage the mutable content labels. The
11 23 // immutable metadata such as digest, size, and committed at cannot
12 24 // be updated.
13 25 rpc Update(UpdateRequest) returns (UpdateResponse);
14 26
15 27 // List streams the entire set of content as Info objects and closes the
16 28 // stream.
17 29 //
18 30 // Typically, this will yield a large response, chunked into messages.
19 31 // Clients should make provisions to ensure they can handle the entire data
20 32 // set.
21 33 rpc List(ListContentRequest) returns (stream ListContentResponse);
22 34
23 35 // Delete will delete the referenced object.
24 36 rpc Delete(DeleteContentRequest) returns (google.protobuf.Empty);
25...省略...
以及很多message结构体,message可以理解成是service定义的接口使用到的结构体,是通信的结构化的数据流。
使用protoc命令生成的.pb.go文件内同步包含server端和client端的接口实现。
3. containerd启动
以containerd启动过程来看。入口为cmd/containerd/main.go, 程序一启动首先就把信号处理函数准备好了。
1cmd/containerd/main.go:
2 85 done := handleSignals(ctx, signals, serverC)
3 86 // start the signal handler as soon as we can to make sure that
4 87 // we don't miss any signals during boot
5 88 signal.Notify(signals, handledSignals...)
后面都是准备并启动grpc server。核心是准备server的这一句
1cmd/containerd/main.go:
2106 server, err := server.New(ctx, config)
进去来看server.New的实现。
前面都是创建目录,主要看加载plugin的部分。
- 3.1. load plugins
1server/server.go:
2func New(ctx context.Context, config *Config) (*Server, error):
3
452 plugins, err := loadPlugins(config)
核心代码:
1162 func loadPlugins(config *Config) ([]*plugin.Registration, error) {
2163 // load all plugins into containerd
3164 if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil {
4165 return nil, err
5166 }
6167 // load additional plugins that don't automatically register themselves
7168 plugin.Register(&plugin.Registration{
8169 Type: plugin.ContentPlugin, // "io.containerd.content.v1"
9170 ID: "content",
10171 Init: func(ic *plugin.InitContext) (interface{}, error) {
11172 return local.NewStore(ic.Root)
12173 },
13174 })
14175 plugin.Register(&plugin.Registration{
15176 Type: plugin.MetadataPlugin, // "io.containerd.metadata.v1"
16177 ID: "bolt",
17178 Init: func(ic *plugin.InitContext) (interface{}, error) {
18179 if err := os.MkdirAll(ic.Root, 0711); err != nil {
19180 return nil, err
20181 }
21182 return bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil)
22183 },
23184 })
24185
25186 // return the ordered graph for plugins
26187 return plugin.Graph(), nil
27188 }
164行进入plugin包,内部实现是golang从1.8(?)开始支持的新特性--go语言自带的plugin支持, 可以加载用户自定义的插件。
168和175是注册了两个最基本的插件,一个是content
插件,一个是metadata
插件,这两个插件基本上是其他插件的基础。
其中content插件主要是依赖content/local那个子package,metadata主要是操纵boltdb数据库meta.db
1root@ubuntu:~/gocode/src/github.com/containerd/containerd# ls /var/lib/containerd/
2io.containerd.content.v1.content io.containerd.runtime.v1.linux io.containerd.snapshotter.v1.overlayfs
3io.containerd.metadata.v1.bolt io.containerd.snapshotter.v1.btrfs
4root@ubuntu:~/gocode/src/github.com/containerd/containerd# ls /var/lib/containerd/io.containerd.metadata.v1.bolt/
5meta.db
plugin.Register
函数其实很简单,就是把某个Registration结构体加入到plugin包的register全局结构体内:
1 59 var register = struct {
2 60 sync.Mutex
3 61 r []*Registration
4 62 }{}
上面的r即承载着所有的Registration结构体。
上文中提到的loadPlugins最后return plugin.Graph()
同样定义在plugin包里,
里面根据Registration.Requires对所有Registration进行了排序,
给出了一个按依赖关系排序的插件数组。比方说plugin a, b, c, 其中b定义了requires a, c定义了requires b,
那么最终给出的排序的插件数组就是[a, b, c] 而不是[b, a, c]或其他。
但是Registration难道只有两个吗?两个插件为什么需要这么复杂? 答案是当然不是只有两个,还有其他的插件,只是他们初始化过程比较隐晦,不是那么直观。
- 3.2. 其他插件在哪儿?
答案是在以下两个文件中:
1cmd/containerd/builtins.go:
2 3 // register containerd builtins here
3 4 import (
4 5 _ "github.com/containerd/containerd/differ"
5 6 _ "github.com/containerd/containerd/services/containers"
6 7 _ "github.com/containerd/containerd/services/content"
7 8 _ "github.com/containerd/containerd/services/diff"
8 9 _ "github.com/containerd/containerd/services/events"
9 10 _ "github.com/containerd/containerd/services/healthcheck"
10 11 _ "github.com/containerd/containerd/services/images"
11 12 _ "github.com/containerd/containerd/services/namespaces"
12 13 _ "github.com/containerd/containerd/services/snapshot"
13 14 _ "github.com/containerd/containerd/services/tasks"
14 15 _ "github.com/containerd/containerd/services/version"
15 16 )
以及(以linux平台为例,其他平台的文件见其他后缀文件):
1cmd/containerd/builtins_linux.go:
2 3 import (
3 4 _ "github.com/containerd/containerd/linux"
4 5 _ "github.com/containerd/containerd/metrics/cgroups"
5 6 _ "github.com/containerd/containerd/snapshot/overlay"
6 7 )
其中import _ "xxx" 就代表着只执行这个包的init函数,但是不使用这个包的任何函数。
以services/content
为例:
1services/content/service.go:
2 38 func init() {
3 39 plugin.Register(&plugin.Registration{
4 40 Type: plugin.GRPCPlugin, // "io.containerd.grpc.v1"
5 41 ID: "content",
6 42 Requires: []plugin.PluginType{
7 43 plugin.ContentPlugin,
8 44 plugin.MetadataPlugin,
9 45 },
10 46 Init: NewService,
11 47 })
12 48 }
init()
函数是golang的基本用法,会在这个package被引用到的时候自动初始化执行。
这里就是注册了另一个plugin.
需要注意的是,plugin.GRPCPlugin
这个类型的插件有不止一种,一般都是通过grpc service对外提供服务的。
在上面提到的import的其他包里,你可以找到很多GRPCPlugin类型的插件。
这个插件依赖于plugin.ContentPlugin
和plugin.MetadataPlugin
,
也就是说初始化过程中,一定会先初始化它依赖的ContentPlugin和MetadataPlugin再初始化它。
Init函数指向NewService这个函数。这个函数本文后面会继续打开来看,我们先暂停到这里。
到此,我们知道了所有plugin都是在哪里找到的。
- 3.3. 注册和启动service
继续回到server.New
的实现中,loadPlugins
完成之后,所有的plugin都加入到plugins
这个数组中了,
下一步就是处理这个数组。下面是一段长长的代码:
1server/server.go:
2func New(ctx context.Context, config *Config) (*Server, error):
3
4 68 for _, p := range plugins {
5 69 id := p.URI() // fmt.Sprintf("%s.%s", r.Type, r.ID)
6 70 log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
7 71
8 72 initContext := plugin.NewContext(
9 73 ctx,
10 74 initialized,
11 75 config.Root, // 默认是"/var/lib/containerd"
12 76 config.State, // 默认是"/run/containerd"
13 77 id,
14 78 )
15 79 initContext.Events = s.events
16 80 initContext.Address = config.GRPC.Address // 默认是"/run/containerd/containerd.sock"
17 81
18 82 // load the plugin specific configuration if it is provided
19 83 if p.Config != nil {
20 84 pluginConfig, err := config.Decode(p.ID, p.Config)
21 85 if err != nil {
22 86 return nil, err
23 87 }
24 88 initContext.Config = pluginConfig
25 89 }
26 90 instance, err := p.Init(initContext)
27 91 if err != nil {
28 92 if plugin.IsSkipPlugin(err) {
29 93 log.G(ctx).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
30 94 } else {
31 95 log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
32 96 }
33 97 continue
34 98 }
35 99
36100 if types, ok := initialized[p.Type]; ok {
37101 types[p.ID] = instance
38102 } else {
39103 initialized[p.Type] = map[string]interface{}{
40104 p.ID: instance,
41105 }
42106 }
43107 // check for grpc services that should be registered with the server
44108 if service, ok := instance.(plugin.Service); ok {
45109 services = append(services, service)
46110 }
47111 }
48112 // register services after all plugins have been initialized
49113 for _, service := range services {
50114 if err := service.Register(rpc); err != nil {
51115 return nil, err
52116 }
53117 }
90行之前都是准备initContext,这个initContext是会传递给每个plugin的Init函数使用的一个初始化数据。
随后重点是90行,会调用每个plugin的Init函数,入参为刚才准备的initContext。
initialized
数组每一轮迭代都会把当前初始化完成的插件放进去,然后传递给下一个plugin
的initContext
作为初始化必须的数据,
下一个插件就可以访问它所依赖的任何一个组件了。
108行需要注意的是,每个插件执行完Init
函数所返回的instance
interface,都会尝试去转换成plugin.Service
接口,
如果它实现了plugin.Service
这个接口,那么它就是一个service,需要加到services
列表,
等待最后在114行执行Register函数进行注册。
1plugin/plugin.go:
2 55 type Service interface {
3 56 Register(*grpc.Server) error
4 57 }
也即是说,只要instance
实现了Register
接口,就是一个服务。
仍然以content service为例。
1services/content/service.go:
2 38 func init() {
3 39 plugin.Register(&plugin.Registration{
4 40 Type: plugin.GRPCPlugin,
5 41 ID: "content",
6 42 Requires: []plugin.PluginType{
7 43 plugin.ContentPlugin,
8 44 plugin.MetadataPlugin,
9 45 },
10 46 Init: NewService,
11 47 })
12 48 }
13 49
14 50 func NewService(ic *plugin.InitContext) (interface{}, error) {
15 51 c, err := ic.Get(plugin.ContentPlugin)
16 52 if err != nil {
17 53 return nil, err
18 54 }
19 55 m, err := ic.Get(plugin.MetadataPlugin)
20 56 if err != nil {
21 57 return nil, err
22 58 }
23 59 cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store))
24 60 return &Service{
25 61 store: cs,
26 62 publisher: ic.Events,
27 63 }, nil
28 64 }
29 65
30 66 func (s *Service) Register(server *grpc.Server) error {
31 67 api.RegisterContentServer(server, s)
32 68 return nil
33 69 }
可以看到content plugin的Init函数返回了content.Service
结构体,这个结构体实现了Register
函数,
它是一个service。
其中67行会跳转到以下:
1api/services/content/v1/content.pb.go:
2 619 // Server API for Content service
3 620
4 621 type ContentServer interface {
5 622 // Info returns information about a committed object.
6 623 //
7 624 // This call can be used for getting the size of content and checking for
8 625 // existence.
9 626 Info(context.Context, *InfoRequest) (*InfoResponse, error)
10 627 // Update updates content metadata.
11 628 //
12 629 // This call can be used to manage the mutable content labels. The
13 630 // immutable metadata such as digest, size, and committed at cannot
14 631 // be updated.
15 632 Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
16 633 // List streams the entire set of content as Info objects and closes the
17 634 // stream.
18 635 //
19 636 // Typically, this will yield a large response, chunked into messages.
20 637 // Clients should make provisions to ensure they can handle the entire data
21 638 // set.
22 639 List(*ListContentRequest, Content_ListServer) error
23 640 // Delete will delete the referenced object.
24 641 Delete(context.Context, *DeleteContentRequest) (*google_protobuf3.Empty, error)
25...省略...
26
27 677 func RegisterContentServer(s *grpc.Server, srv ContentServer) {
28 678 s.RegisterService(&_Content_serviceDesc, srv)
29 679 }
也就是content.Service
必须是ContentServer
的一个实现。
下面一句划重点:
api/services
包定义了所有用户自定义的grpc服务的接口,其中.proto
文件包含了用户自定义的service接口,
而.pb.go
是protoc自动生成的service定义,包含server端和client端定义;services/
包里实现了api/services/
用户定义的接口
api/services
包含的是接口定义,services
包是实现。
在上文中services/content/service.go
包含了content service的server端实现,而services/content/store.go
对client端做了封装,
更加便于使用。
4. 总结
上文对containerd的启动流程做了总结,主要是围绕containerd如何启动多个grpc service给出分析的。grpc可以说是containerd的实现核心, 与docker daemon的http restful API还是有较大不同。后续会针对部分单独的组件再来做分析。