containerd源码阅读(1)--框架篇

Overview

1. 简介

containerd是与docker直接沟通的下属组件,详细是什么不说了。 每个docker daemon启动的时候都会启动一个containerd daemon,启动容器的时候, 每个容器的init进程/exec进程都会对应一个containerd-shim进程, containerd-shim同样是containerd库里面单独的一个二进制程序, containerd-shim会调用runc最终启动容器。 这些基本的知识一笔带过不详细展开。

随着docker改名为moby,docker的大部分功能,比如image管理,容器运行都会下沉到containerd, docker会越来越侧重于编排调度部分--swarm。

简单分析下containerd的代码架构,作为官方containerd文档的一个补充。 本篇以git commit d700a9c35b09239c8c056cd5df73bc19a79db9a9 为标准讲解。

2. grpc

containerd的架构极其依赖grpc协议。

1# ls api/services/
2containers  content  diff  events  images  namespaces  snapshot  tasks  version

api/services目录下存放着containerd提供的不同服务对应的grpc接口。 以比较基础的content服务为例,

1# ls api/services/content/v1/
2content.pb.go  content.proto

下面一共有两个文件,一个content.proto一个是.pb.go, 其中用户只需要定义content.proto文件, 而程序最终使用的content.pb.go则可以由grpc命令自动生成。 containerd在Makefile中提供了生成.pb.go的指令

1// 安装依赖的库
2# cd containerd && make setup 
3# make protos

打开content.proto 来看,里面主要定义了一个service:

 113 service Content {
 2 14     // Info returns information about a committed object.
 3 15     //
 4 16     // This call can be used for getting the size of content and checking for
 5 17     // existence.
 6 18     rpc Info(InfoRequest) returns (InfoResponse);
 7 19 
 8 20     // Update updates content metadata.
 9 21     //
10 22     // This call can be used to manage the mutable content labels. The
11 23     // immutable metadata such as digest, size, and committed at cannot
12 24     // be updated.
13 25     rpc Update(UpdateRequest) returns (UpdateResponse);
14 26 
15 27     // List streams the entire set of content as Info objects and closes the
16 28     // stream.
17 29     //
18 30     // Typically, this will yield a large response, chunked into messages.
19 31     // Clients should make provisions to ensure they can handle the entire data
20 32     // set.
21 33     rpc List(ListContentRequest) returns (stream ListContentResponse);
22 34 
23 35     // Delete will delete the referenced object.
24 36     rpc Delete(DeleteContentRequest) returns (google.protobuf.Empty);
25...省略...

以及很多message结构体,message可以理解成是service定义的接口使用到的结构体,是通信的结构化的数据流。

使用protoc命令生成的.pb.go文件内同步包含server端和client端的接口实现。

3. containerd启动

以containerd启动过程来看。入口为cmd/containerd/main.go, 程序一启动首先就把信号处理函数准备好了。

1cmd/containerd/main.go:
2 85         done := handleSignals(ctx, signals, serverC)
3 86         // start the signal handler as soon as we can to make sure that
4 87         // we don't miss any signals during boot
5 88         signal.Notify(signals, handledSignals...)

后面都是准备并启动grpc server。核心是准备server的这一句

1cmd/containerd/main.go:
2106	server, err := server.New(ctx, config)

进去来看server.New的实现。

前面都是创建目录,主要看加载plugin的部分。

  • 3.1. load plugins
1server/server.go:
2func New(ctx context.Context, config *Config) (*Server, error):
3 
452     plugins, err := loadPlugins(config)

核心代码:

 1162 func loadPlugins(config *Config) ([]*plugin.Registration, error) {
 2163     // load all plugins into containerd
 3164     if err := plugin.Load(filepath.Join(config.Root, "plugins")); err != nil {
 4165         return nil, err
 5166     }
 6167     // load additional plugins that don't automatically register themselves
 7168     plugin.Register(&plugin.Registration{
 8169         Type: plugin.ContentPlugin,  // "io.containerd.content.v1"
 9170         ID:   "content",
10171         Init: func(ic *plugin.InitContext) (interface{}, error) {
11172             return local.NewStore(ic.Root)
12173         },
13174     })
14175     plugin.Register(&plugin.Registration{
15176         Type: plugin.MetadataPlugin,  // "io.containerd.metadata.v1"
16177         ID:   "bolt",
17178         Init: func(ic *plugin.InitContext) (interface{}, error) {
18179             if err := os.MkdirAll(ic.Root, 0711); err != nil {
19180                 return nil, err
20181             }
21182             return bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil)
22183         },
23184     })
24185 
25186     // return the ordered graph for plugins
26187     return plugin.Graph(), nil
27188 }

164行进入plugin包,内部实现是golang从1.8(?)开始支持的新特性--go语言自带的plugin支持, 可以加载用户自定义的插件。

168和175是注册了两个最基本的插件,一个是content插件,一个是metadata插件,这两个插件基本上是其他插件的基础。 其中content插件主要是依赖content/local那个子package,metadata主要是操纵boltdb数据库meta.db

1root@ubuntu:~/gocode/src/github.com/containerd/containerd# ls /var/lib/containerd/
2io.containerd.content.v1.content  io.containerd.runtime.v1.linux      io.containerd.snapshotter.v1.overlayfs
3io.containerd.metadata.v1.bolt    io.containerd.snapshotter.v1.btrfs
4root@ubuntu:~/gocode/src/github.com/containerd/containerd# ls /var/lib/containerd/io.containerd.metadata.v1.bolt/
5meta.db

plugin.Register函数其实很简单,就是把某个Registration结构体加入到plugin包的register全局结构体内:

1 59 var register = struct {
2 60     sync.Mutex
3 61     r []*Registration
4 62 }{}

上面的r即承载着所有的Registration结构体。

上文中提到的loadPlugins最后return plugin.Graph()同样定义在plugin包里, 里面根据Registration.Requires对所有Registration进行了排序, 给出了一个按依赖关系排序的插件数组。比方说plugin a, b, c, 其中b定义了requires a, c定义了requires b, 那么最终给出的排序的插件数组就是[a, b, c] 而不是[b, a, c]或其他。

但是Registration难道只有两个吗?两个插件为什么需要这么复杂? 答案是当然不是只有两个,还有其他的插件,只是他们初始化过程比较隐晦,不是那么直观。

  • 3.2. 其他插件在哪儿?

答案是在以下两个文件中:

 1cmd/containerd/builtins.go:
 2  3 // register containerd builtins here
 3  4 import (
 4  5     _ "github.com/containerd/containerd/differ"
 5  6     _ "github.com/containerd/containerd/services/containers"
 6  7     _ "github.com/containerd/containerd/services/content"
 7  8     _ "github.com/containerd/containerd/services/diff"
 8  9     _ "github.com/containerd/containerd/services/events"
 9 10     _ "github.com/containerd/containerd/services/healthcheck"
10 11     _ "github.com/containerd/containerd/services/images"
11 12     _ "github.com/containerd/containerd/services/namespaces"
12 13     _ "github.com/containerd/containerd/services/snapshot"
13 14     _ "github.com/containerd/containerd/services/tasks"
14 15     _ "github.com/containerd/containerd/services/version"
15 16 )

以及(以linux平台为例,其他平台的文件见其他后缀文件):

1cmd/containerd/builtins_linux.go:
2  3 import (
3  4     _ "github.com/containerd/containerd/linux"
4  5     _ "github.com/containerd/containerd/metrics/cgroups"
5  6     _ "github.com/containerd/containerd/snapshot/overlay"
6  7 )

其中import _ "xxx" 就代表着只执行这个包的init函数,但是不使用这个包的任何函数。

services/content为例:

 1services/content/service.go:
 2 38 func init() {
 3 39     plugin.Register(&plugin.Registration{
 4 40         Type: plugin.GRPCPlugin,  // "io.containerd.grpc.v1"
 5 41         ID:   "content",
 6 42         Requires: []plugin.PluginType{
 7 43             plugin.ContentPlugin,
 8 44             plugin.MetadataPlugin,
 9 45         },
10 46         Init: NewService,
11 47     })
12 48 }

init()函数是golang的基本用法,会在这个package被引用到的时候自动初始化执行。 这里就是注册了另一个plugin. 需要注意的是,plugin.GRPCPlugin这个类型的插件有不止一种,一般都是通过grpc service对外提供服务的。 在上面提到的import的其他包里,你可以找到很多GRPCPlugin类型的插件。 这个插件依赖于plugin.ContentPluginplugin.MetadataPlugin, 也就是说初始化过程中,一定会先初始化它依赖的ContentPlugin和MetadataPlugin再初始化它。 Init函数指向NewService这个函数。这个函数本文后面会继续打开来看,我们先暂停到这里。 到此,我们知道了所有plugin都是在哪里找到的。

  • 3.3. 注册和启动service

继续回到server.New的实现中,loadPlugins完成之后,所有的plugin都加入到plugins这个数组中了, 下一步就是处理这个数组。下面是一段长长的代码:

 1server/server.go:
 2func New(ctx context.Context, config *Config) (*Server, error):
 3 
 4 68     for _, p := range plugins {
 5 69         id := p.URI()  // fmt.Sprintf("%s.%s", r.Type, r.ID)
 6 70         log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
 7 71 
 8 72         initContext := plugin.NewContext(
 9 73             ctx,
10 74             initialized,
11 75             config.Root,  // 默认是"/var/lib/containerd"
12 76             config.State, // 默认是"/run/containerd"
13 77             id,
14 78         )
15 79         initContext.Events = s.events
16 80         initContext.Address = config.GRPC.Address // 默认是"/run/containerd/containerd.sock"
17 81 
18 82         // load the plugin specific configuration if it is provided
19 83         if p.Config != nil {
20 84             pluginConfig, err := config.Decode(p.ID, p.Config)
21 85             if err != nil {
22 86                 return nil, err
23 87             }
24 88             initContext.Config = pluginConfig
25 89         }
26 90         instance, err := p.Init(initContext)
27 91         if err != nil {
28 92             if plugin.IsSkipPlugin(err) {
29 93                 log.G(ctx).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
30 94             } else {
31 95                 log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
32 96             }
33 97             continue
34 98         }
35 99 
36100         if types, ok := initialized[p.Type]; ok {
37101             types[p.ID] = instance
38102         } else {
39103             initialized[p.Type] = map[string]interface{}{
40104                 p.ID: instance,
41105             }
42106         }
43107         // check for grpc services that should be registered with the server
44108         if service, ok := instance.(plugin.Service); ok {
45109             services = append(services, service)
46110         }
47111     }
48112     // register services after all plugins have been initialized
49113     for _, service := range services {
50114         if err := service.Register(rpc); err != nil {
51115             return nil, err
52116         }
53117     }

90行之前都是准备initContext,这个initContext是会传递给每个plugin的Init函数使用的一个初始化数据。 随后重点是90行,会调用每个plugin的Init函数,入参为刚才准备的initContext。 initialized数组每一轮迭代都会把当前初始化完成的插件放进去,然后传递给下一个plugininitContext作为初始化必须的数据, 下一个插件就可以访问它所依赖的任何一个组件了。

108行需要注意的是,每个插件执行完Init函数所返回的instance interface,都会尝试去转换成plugin.Service接口, 如果它实现了plugin.Service这个接口,那么它就是一个service,需要加到services列表, 等待最后在114行执行Register函数进行注册。

1plugin/plugin.go:
2 55 type Service interface {                                               
3 56     Register(*grpc.Server) error
4 57 }   

也即是说,只要instance实现了Register接口,就是一个服务。

仍然以content service为例。

 1services/content/service.go:
 2 38 func init() {
 3 39     plugin.Register(&plugin.Registration{
 4 40         Type: plugin.GRPCPlugin,
 5 41         ID:   "content",
 6 42         Requires: []plugin.PluginType{
 7 43             plugin.ContentPlugin,
 8 44             plugin.MetadataPlugin,
 9 45         },
10 46         Init: NewService,
11 47     })
12 48 }
13 49 
14 50 func NewService(ic *plugin.InitContext) (interface{}, error) {
15 51     c, err := ic.Get(plugin.ContentPlugin)
16 52     if err != nil {
17 53         return nil, err
18 54     }
19 55     m, err := ic.Get(plugin.MetadataPlugin)
20 56     if err != nil {
21 57         return nil, err
22 58     }
23 59     cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store))
24 60     return &Service{
25 61         store:     cs,
26 62         publisher: ic.Events,
27 63     }, nil
28 64 }
29 65 
30 66 func (s *Service) Register(server *grpc.Server) error {
31 67     api.RegisterContentServer(server, s)
32 68     return nil
33 69 }

可以看到content plugin的Init函数返回了content.Service结构体,这个结构体实现了Register函数, 它是一个service。 其中67行会跳转到以下:

 1api/services/content/v1/content.pb.go:
 2 619 // Server API for Content service
 3 620 
 4 621 type ContentServer interface {
 5 622     // Info returns information about a committed object.
 6 623     //
 7 624     // This call can be used for getting the size of content and checking for
 8 625     // existence.
 9 626     Info(context.Context, *InfoRequest) (*InfoResponse, error)
10 627     // Update updates content metadata.
11 628     //
12 629     // This call can be used to manage the mutable content labels. The
13 630     // immutable metadata such as digest, size, and committed at cannot
14 631     // be updated.
15 632     Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
16 633     // List streams the entire set of content as Info objects and closes the
17 634     // stream.
18 635     //
19 636     // Typically, this will yield a large response, chunked into messages.
20 637     // Clients should make provisions to ensure they can handle the entire data
21 638     // set.
22 639     List(*ListContentRequest, Content_ListServer) error
23 640     // Delete will delete the referenced object.
24 641     Delete(context.Context, *DeleteContentRequest) (*google_protobuf3.Empty, error)
25...省略...
26
27 677 func RegisterContentServer(s *grpc.Server, srv ContentServer) {
28 678     s.RegisterService(&_Content_serviceDesc, srv)
29 679 }

也就是content.Service必须是ContentServer的一个实现。

下面一句划重点:

api/services包定义了所有用户自定义的grpc服务的接口,其中.proto文件包含了用户自定义的service接口, 而.pb.go是protoc自动生成的service定义,包含server端和client端定义;services/包里实现了api/services/用户定义的接口

api/services包含的是接口定义,services包是实现。

在上文中services/content/service.go包含了content service的server端实现,而services/content/store.go对client端做了封装, 更加便于使用。

4. 总结

上文对containerd的启动流程做了总结,主要是围绕containerd如何启动多个grpc service给出分析的。grpc可以说是containerd的实现核心, 与docker daemon的http restful API还是有较大不同。后续会针对部分单独的组件再来做分析。

containerd源码分析xmind文件