Containerd深度剖析-CRI篇

文摘   科技   2022-12-28 08:00   江苏  


撰文 | 段全锋

编辑 | zouyee

段全锋: 软件工程师,熟悉K8s架构、精通Runtime底层技术细节等。

目前我司现网的K8s集群的运行时已经完成从docker到Containerd的切换,有小伙伴对K8s与Containerd调用链涉及的组件不了解,其中Containerd和RunC是什么关系,docker和containerd又有什么区别,以及K8s调用Containerd创建容器又是怎样的流程,最终RunC又是如何创建容器的,诸如此类的疑问。本文就针对K8s使用Containerd作为运行时的整个调用链进行介绍和源码级别的分析。


其中关于kubelet与运行时的分层架构图可以参看下图

那么关于各类运行时的介绍可以参看Containerd深度剖析-runtime篇


“运行时简介


容器运行时意思就是能够管理容器运行的整个生命周期,具体一点就是如何制作容器的镜像、容器镜像格式是什么样子的、管理容器的镜像、容器镜像的分发、如何运行一个容器以及管理创建的容器实例等等。
容器运行时有一个行业标准叫做OCI规范,这个规范分成两部分:
a. 容器运行时规范:描述了如何通过一个bundle运行容器,bundle就是一个目录,里面包括一个容器的规格文      件,文件叫config.json 和一个rootfsrootfs中包含了一个容器运行时所需操作系统的文件。

b. 容器镜像规范:定义了容器的镜像如何打包如何将镜像转换成一个bundle。

的老爷子,考虑是新冠肺炎转成重症。虽然意识还比较清醒,但血气分析结果很差,需要住进ICU进行呼吸监护。从普通病房转运至ICU,坐电梯上几层楼就能到,但如果给氧条件不够,风险会非常大。他的妻女想见老爷子一面、说说话,怕以后再也见不到了。

目前流行将运行时分成low-level运行时和high-level运行时,low-level运行时专注于如何创建一个容器例如runc和katahigh-level包含了更多上层功能,比如镜像管理,以dockercontainerd为代表

K8s的kubelet是调用容器运行时创建容器的,但是容器运行时这么多不可能逐个兼容,K8s在对接容器运行时定义了CRI接口,容器运行时只需实现该接口就能被使用。下图分别是k8s使用docker和containerd的调用链,使用containerd时CRI接口是在containerd代码中实现的;使用docker时的CRI接口是在k8s的代码中实现的,叫做docker-shim(kubernetes/pkg/kubelet/dockershim/docker_service.go),这部分代码在k8s代码中是历史原因,当时docker是容器方面行业事实上的标准,但随着越来越多运行时实现了CRI支持,docker-shim的维护日益变成社区负担,在最新的K8s版本中,该部分代码目前已经移出,暂时由mirantis进行维护,下图是插件的发展历程。


“Containerd CRI简介


Containerd是一个行业标准的容器运行时,它是一个daemon进程,可以管理主机上容器的全部生命周期和它的文件系统,包括:镜像的分发和存储、容器的运行和监控,底层的存储和网络。

Containerd有多种客户端,比如K8s、docker等,为了不同客户端的容器或者镜像能隔离开,Containerd中有namespace概念,默认情况下docker的namespace是moby,K8s的是k8s.io。

container在Containerd中代表的是一个容器的元数据,containerd中的Task用于获取容器对象并将它转换成在操作系统中可运行的进程,它代表的就是容器中可运行的对象。

Containerd内部的cri模块实现K8s的CRI接口,所以K8s的kubelet可以直接使用containerd。CRI的接口包括:RuntimeServiceImageService

// Runtime service defines the public APIs for remote container runtimesservice RuntimeService {  // Version returns the runtime name, runtime version, and runtime API version.  rpc Version(VersionRequest) returns (VersionResponse) {}
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure // the sandbox is in the ready state on success. rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {} // StopPodSandbox stops any running process that is part of the sandbox and // reclaims network resources (e.g., IP addresses) allocated to the sandbox. // If there are any running containers in the sandbox, they must be forcibly // terminated. // This call is idempotent, and must not return an error if all relevant // resources have already been reclaimed. kubelet will call StopPodSandbox // at least once before calling RemovePodSandbox. It will also attempt to // reclaim resources eagerly, as soon as a sandbox is not needed. Hence, // multiple StopPodSandbox calls are expected. rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {} // RemovePodSandbox removes the sandbox. If there are any running containers // in the sandbox, they must be forcibly terminated and removed. // This call is idempotent, and must not return an error if the sandbox has // already been removed. rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {} // PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not // present, returns an error. rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {} // ListPodSandbox returns a list of PodSandboxes. rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}
// CreateContainer creates a new container in specified PodSandbox rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {} // StartContainer starts the container. rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {} // StopContainer stops a running container with a grace period (i.e., timeout). // This call is idempotent, and must not return an error if the container has // already been stopped. // The runtime must forcibly kill the container after the grace period is // reached. rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {} // RemoveContainer removes the container. If the container is running, the // container must be forcibly removed. // This call is idempotent, and must not return an error if the container has // already been removed. rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {} // ListContainers lists all containers by filters. rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} // ContainerStatus returns status of the container. If the container is not // present, returns an error. rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {} // UpdateContainerResources updates ContainerConfig of the container synchronously. // If runtime fails to transactionally update the requested resources, an error is returned. rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {} // ReopenContainerLog asks runtime to reopen the stdout/stderr log file // for the container. This is often called after the log file has been // rotated. If the container is not running, container runtime can choose // to either create a new log file and return nil, or return an error. // Once it returns error, new container log file MUST NOT be created. rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}
// ExecSync runs a command in a container synchronously. rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {} // Exec prepares a streaming endpoint to execute a command in the container. rpc Exec(ExecRequest) returns (ExecResponse) {} // Attach prepares a streaming endpoint to attach to a running container. rpc Attach(AttachRequest) returns (AttachResponse) {} // PortForward prepares a streaming endpoint to forward ports from a PodSandbox. rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}
// ContainerStats returns stats of the container. If the container does not // exist, the call returns an error. rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {} // ListContainerStats returns stats of all running containers. rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}
// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not // exist, the call returns an error. rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {} // ListPodSandboxStats returns stats of the pod sandboxes matching a filter. rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}
// UpdateRuntimeConfig updates the runtime configuration based on the given request. rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}
// Status returns the status of the runtime. rpc Status(StatusRequest) returns (StatusResponse) {}}
// ImageService defines the public APIs for managing images.service ImageService { // ListImages lists existing images. rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {} // ImageStatus returns the status of the image. If the image is not // present, returns a response with ImageStatusResponse.Image set to // nil. rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {} // PullImage pulls an image with authentication config. rpc PullImage(PullImageRequest) returns (PullImageResponse) {} // RemoveImage removes the image. // This call is idempotent, and must not return an error if the image has // already been removed. rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {} // ImageFSInfo returns information of the filesystem that is used to store images. rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}}

kubelet调用CRI接口创建一个包含A和B两个业务container的Pod流程如下所示:

① 为Pod创建sandbox

② 创建container A

③ 启动container A

④ 创建container B

⑤ 启动container B


“Containerd CRI实现


RunPodSandbox


RunPodSandbox的流程如下:

① 拉取sandbox的镜像,在containerd中配置

② 获取创建pod要使用的runtime,可以在创建pod的yaml中指定,如果没指定使用containerd中默认的

(runtime在containerd中配置)

③ 如果pod不是hostNetwork那么添加创建新net namespace,并使用cni插件设置网络(criService在初始化时会加载containerd中cri指定的插件信息)

④ 调用containerd客户端创建一个container

⑤ 在rootDir/io.containerd.grpc.v1.cri/sandboxes下为当前pod以pod Id为名创建一个目录

(pkg/cri/cri.go)

⑥ 根据选择的runtime为sandbox容器创建task

⑦ 启动sandbox容器的task,将sandbox添加到数据库中

代码在containerd/pkg/cri/server/sanbox_run.go 中

// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure// the sandbox is in ready state.func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) {  config := r.GetConfig()  log.G(ctx).Debugf("Sandbox config %+v", config)
// Generate unique id and name for the sandbox and reserve the name. id := util.GenerateID() metadata := config.GetMetadata() if metadata == nil { return nil, errors.New("sandbox config must include metadata") } name := makeSandboxName(metadata) log.G(ctx).WithField("podsandboxid", id).Debugf("generated id for sandbox name %q", name)
// cleanupErr records the last error returned by the critical cleanup operations in deferred functions, // like CNI teardown and stopping the running sandbox task. // If cleanup is not completed for some reason, the CRI-plugin will leave the sandbox // in a not-ready state, which can later be cleaned up by the next execution of the kubelet's syncPod workflow. var cleanupErr error
// Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the // same sandbox. if err := c.sandboxNameIndex.Reserve(name, id); err != nil { return nil, fmt.Errorf("failed to reserve sandbox name %q: %w", name, err) } defer func() { // Release the name if the function returns with an error. // When cleanupErr != nil, the name will be cleaned in sandbox_remove. if retErr != nil && cleanupErr == nil { c.sandboxNameIndex.ReleaseByName(name) } }()
var ( err error sandboxInfo = sb.Sandbox{ID: id} )
ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler()) if err != nil { return nil, fmt.Errorf("unable to get OCI runtime for sandbox %q: %w", id, err) }
sandboxInfo.Runtime.Name = ociRuntime.Type
// Retrieve runtime options runtimeOpts, err := generateRuntimeOptions(ociRuntime, c.config) if err != nil { return nil, fmt.Errorf("failed to generate sandbox runtime options: %w", err)  }
...
// Create initial internal sandbox object. sandbox := sandboxstore.NewSandbox( ... )
if _, err := c.client.SandboxStore().Create(ctx, sandboxInfo); err != nil { return nil, fmt.Errorf("failed to save sandbox metadata: %w", err) } ...
// Setup the network namespace if host networking wasn't requested. if !hostNetwork(config) { netStart := time.Now() // If it is not in host network namespace then create a namespace and set the sandbox // handle. NetNSPath in sandbox metadata and NetNS is non empty only for non host network // namespaces. If the pod is in host network namespace then both are empty and should not // be used. var netnsMountDir = "/var/run/netns" if c.config.NetNSMountsUnderStateDir { netnsMountDir = filepath.Join(c.config.StateDir, "netns") } sandbox.NetNS, err = netns.NewNetNS(netnsMountDir) if err != nil { return nil, fmt.Errorf("failed to create network namespace for sandbox %q: %w", id, err) } // Update network namespace in the store, which is used to generate the container's spec sandbox.NetNSPath = sandbox.NetNS.GetPath() defer func() { // Remove the network namespace only if all the resource cleanup is done if retErr != nil && cleanupErr == nil { if cleanupErr = sandbox.NetNS.Remove(); cleanupErr != nil { log.G(ctx).WithError(cleanupErr).Errorf("Failed to remove network namespace %s for sandbox %q", sandbox.NetNSPath, id) return } sandbox.NetNSPath = "" } }()
if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil { return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err) } // Save sandbox metadata to store if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil { return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err) }
// Define this defer to teardownPodNetwork prior to the setupPodNetwork function call. // This is because in setupPodNetwork the resource is allocated even if it returns error, unlike other resource // creation functions. defer func() { // Remove the network namespace only if all the resource cleanup is done. if retErr != nil && cleanupErr == nil { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() // Teardown network if an error is returned. if cleanupErr = c.teardownPodNetwork(deferCtx, sandbox); cleanupErr != nil { log.G(ctx).WithError(cleanupErr).Errorf("Failed to destroy network for sandbox %q", id) }
} }()
// Setup network for sandbox. // Certain VM based solutions like clear containers (Issue containerd/cri-containerd#524) // rely on the assumption that CRI shim will not be querying the network namespace to check the // network states such as IP. // In future runtime implementation should avoid relying on CRI shim implementation details. // In this case however caching the IP will add a subtle performance enhancement by avoiding // calls to network namespace of the pod to query the IP of the veth interface on every // SandboxStatus request. if err := c.setupPodNetwork(ctx, &sandbox); err != nil { return nil, fmt.Errorf("failed to setup network for sandbox %q: %w", id, err) } sandboxCreateNetworkTimer.UpdateSince(netStart) }
if err := sandboxInfo.AddExtension(podsandbox.MetadataKey, &sandbox.Metadata); err != nil { return nil, fmt.Errorf("unable to save sandbox %q to store: %w", id, err) }
controller, err := c.getSandboxController(config, r.GetRuntimeHandler()) if err != nil { return nil, fmt.Errorf("failed to get sandbox controller: %w", err) }
// Save sandbox metadata to store if sandboxInfo, err = c.client.SandboxStore().Update(ctx, sandboxInfo, "extensions"); err != nil { return nil, fmt.Errorf("unable to update extensions for sandbox %q: %w", id, err) }
runtimeStart := time.Now()
if err := controller.Create(ctx, id); err != nil { return nil, fmt.Errorf("failed to create sandbox %q: %w", id, err) }
resp, err := controller.Start(ctx, id) if err != nil { sandbox.Container, _ = c.client.LoadContainer(ctx, id) if resp != nil && resp.SandboxID == "" && resp.Pid == 0 && resp.CreatedAt == nil && len(resp.Labels) == 0 { // if resp is a non-nil zero-value, an error occurred during cleanup cleanupErr = fmt.Errorf("failed to cleanup sandbox") } return nil, fmt.Errorf("failed to start sandbox %q: %w", id, err) }
// TODO: get rid of this. sandbox object should no longer have Container field. if ociRuntime.SandboxMode == string(criconfig.ModePodSandbox) { container, err := c.client.LoadContainer(ctx, id) if err != nil { return nil, fmt.Errorf("failed to load container %q for sandbox: %w", id, err) } sandbox.Container = container }
labels := resp.GetLabels() if labels == nil { labels = map[string]string{} }
sandbox.ProcessLabel = labels["selinux_label"]
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { // Set the pod sandbox as ready after successfully start sandbox container. status.Pid = resp.Pid status.State = sandboxstore.StateReady status.CreatedAt = protobuf.FromTimestamp(resp.CreatedAt) return status, nil }); err != nil { return nil, fmt.Errorf("failed to update sandbox status: %w", err) }
// Add sandbox into sandbox store in INIT state. if err := c.sandboxStore.Add(sandbox); err != nil { return nil, fmt.Errorf("failed to add sandbox %+v into store: %w", sandbox, err) }
// Send CONTAINER_CREATED event with both ContainerId and SandboxId equal to SandboxId. // Note that this has to be done after sandboxStore.Add() because we need to get // SandboxStatus from the store and include it in the event. c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_CREATED_EVENT)
// start the monitor after adding sandbox into the store, this ensures // that sandbox is in the store, when event monitor receives the TaskExit event. // // TaskOOM from containerd may come before sandbox is added to store, // but we don't care about sandbox TaskOOM right now, so it is fine. go func() { resp, err := controller.Wait(ctrdutil.NamespacedContext(), id) if err != nil { log.G(ctx).WithError(err).Error("failed to wait for sandbox controller, skipping exit event") return }
e := &eventtypes.TaskExit{ ContainerID: id, ID: id, // Pid is not used Pid: 0, ExitStatus: resp.ExitStatus, ExitedAt: resp.ExitedAt, } c.eventMonitor.backOff.enBackOff(id, e) }()
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId. c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)
sandboxRuntimeCreateTimer.WithValues(labels["oci_runtime_type"]).UpdateSince(runtimeStart)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil}

CreateContainer

CreateContainer在指定的PodSandbox中创建一个新的container元数据,流程如下:

① 获取容器的sandbox信息

② 为容器要用的镜像初始化镜像handler

③ 为容器在rootDir/io.containerd.grpc.v1.cri目录下以容器Id命名的目录

④ 从sandbox中获取所使用的runtime

⑤ 为容器创建containerSpec

⑥ 使用containerd客户端创建container

⑦ 保存container的信息

代码见containerd/pkg/cri/server/container_create.go 下面是省略过的代码。

func (c *criService) CreateContainer(ctx context.Context, r*runtime.CreateContainerRequest) (_ *runtime.CreateContainerResponse, retErr error) { 
    sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) s, err := sandbox.Container.Task(ctx, nil) sandboxPid := s.Pid() image, err := c.localResolve(config.GetImage().GetImage()) if err != nil { return nil, errors.Wrapf(err, "failed to resolve image %q", config.GetImage().GetImage()) } containerdImage, err := c.toContainerdImage(ctx, image) // Run container using the same runtime with sandbox. sandboxInfo, err := sandbox.Container.Info(ctx) if err != nil { return nil, errors.Wrapf(err, "failed to get sandbox %q info", sandboxID) }
// Create container root directory. containerRootDir := c.getContainerRootDir(id) if err = c.os.MkdirAll(containerRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create container root directory %q", containerRootDir) } ociRuntime, err := c.getSandboxRuntime(sandboxConfig, sandbox.Metadata.RuntimeHandler) if err != nil { return nil, errors.Wrap(err, "failed to get sandbox runtime") }
spec, err := c.containerSpec(id, sandboxID, sandboxPid, sandbox.NetNSPath, containerName, containerdImage.Name(), config, sandboxConfig,&image.ImageSpec.Config, append(mounts, volumeMounts...), ociRuntime) if err != nil { return nil, errors.Wrapf(err, "failed to generate container %q spec", id) } opts = append(opts, containerd.WithSpec(spec, specOpts...),containerd.WithRuntime(sandboxInfo.Runtime.Name, runtimeOptions), containerd.WithContainerLabels(containerLabels), containerd.WithContainerExtension(containerMetadataExtension, &meta)) var cntr containerd.Container     if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {       return nil, errors.Wrap(err, "failed to create containerd container") }// Add container into container store. if err := c.containerStore.Add(container); err != nil { return nil, errors.Wrapf(err, "failed to add container %q into store", id) }}
StartContainer

StartContainer用于启动一个容器,流程如下:

① 读取保存的container元数据
② 读取关联的sandbox信息
③ 为容器创建task
④ 启动task

代码见containerd/pkg/cri/server/container_start.go ,下面是该部分省略过后的代码:

func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) {
cntr, err := c.containerStore.Get(r.GetContainerId()) // Get sandbox config from sandbox store. sandbox, err := c.sandboxStore.Get(meta.SandboxID) ctrInfo, err := container.Info(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get container info") }
taskOpts := c.taskOpts(ctrInfo.Runtime.Name) task, err := container.NewTask(ctx, ioCreation, taskOpts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd task") } // wait is a long running background request, no timeout needed. exitCh, err := task.Wait(ctrdutil.NamespacedContext()) // Start containerd task. if err := task.Start(ctx); err != nil { return nil, errors.Wrapf(err, "failed to start containerd task %q", id)     }}

创建task的代码如下,调用了containerd的客户端的TasksClient,向服务器端发送创建task的请求

func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts...NewTaskOpts) (_ Task, err error) {    ......    request := &tasks.CreateTaskRequest{                   ContainerID: c.id,                  Terminal:  cfg.Terminal, Stdin:  cfg.Stdin,                  Stdout:  cfg.Stdout,                  Stderr:  cfg.Stderr,                }     ......     response, err := c.client.TaskService().Create(ctx, request)     ......


task启动的代码如下,调用了containerd的客户端的TasksClient,向服务器端发送启动task的请求。

func (t *task) Start(ctx context.Context) error {  r, err := t.client.TaskService().Start(ctx, &tasks.StartRequest{    ContainerID: t.id,  })  if err != nil {    if t.io != nil {      t.io.Cancel()      t.io.Close()    }    return errdefs.FromGRPC(err)  }  t.pid = r.Pid  return nil}





“Task Service


Task Service创建task流程

下面是tasks-service处理创建task请求的代码,根据容器运行时创建容器。

func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _...grpc.CallOption) (*api.CreateTaskResponse, error) { container, err := l.getContainer(ctx, r.ContainerID)......    rtime, err := l.getRuntime(container.Runtime.Name)     if err != nil {      return nil, err    }    _, err = rtime.Get(ctx, r.ContainerID)    if err != nil && err != runtime.ErrTaskNotExists {         return nil, errdefs.ToGRPC(err)    }    if err == nil {        return nil, errdefs.ToGRPC(fmt.Errorf("task %s already exists", r.ContainerID))    }    c, err := rtime.Create(ctx, r.ContainerID, opts)    ......    return &api.CreateTaskResponse{         ContainerID: r.ContainerID, Pid: c.PID(),    }, nil
runtime创建容器代码如下,启动了shim并向shim发送创建请求。
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {    ......    shim, err := m.startShim(ctx, bundle, id, opts)
t, err := shim.Create(ctx, opts) .....}

startShim调用shim可执行文件启动了一个service,代码如下:

func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {    ......    b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)    shim, err := b.Start(ctx, topts, func() {         log.G(ctx).WithField("id", id).Info("shim disconnected")        cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) m.tasks.Delete(ctx, id)    })    ......

执行shim命令所使用的可执行文件是containerd-shim-<runtime>-<version> ,比如我们平时使用的运行时类型是io.containerd.runc.v2 ,那么所用的可执行文件就是containerd-shim-runc-v2 ,完整的命令格式是

containerd-shim-runc-v2 -namespace xxxx -address xxxx -publish-binary xxxx -id xxxx start
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {    args := []string{"­id", b.bundle.ID}
args = append(args, "start")
    cmd, err :=      client.Command(ctx, b.runtime, b.containerdAddress, b.containerdTTRPCAddress, b.bundle.Path, opts, args...,    ) ...... out, err := cmd.CombinedOutput() if err != nil { return nil, errors.Wrapf(err, "%s", out) } address := strings.TrimSpace(string(out)) conn, err := client.Connect(address, client.AnonDialer) if err != nil { return nil, err } onCloseWithShimLog := func() { onClose() cancelShimLog() f.Close() }   client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) return &shim{ bundle: b.bundle, client: client, }, nil
Task Service启动task流程

下面是tasks-service启动一个task的流程:

func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) {  t, err := l.getTask(ctx, r.ContainerID)  if err != nil {    return nil, err  }  p := runtime.Process(t)  if r.ExecID != "" {    if p, err = t.Process(ctx, r.ExecID); err != nil {      return nil, errdefs.ToGRPC(err)    }  }  if err := p.Start(ctx); err != nil {    return nil, errdefs.ToGRPC(err)  }  state, err := p.State(ctx)  if err != nil {    return nil, errdefs.ToGRPC(err)  }  return &api.StartResponse{    Pid: state.Pid,  }, nil}

启动容器的进程通过向shim的server端发送请求完成。

func (s *shim) Start(ctx context.Context) error {     response, err := s.task.Start(ctx,       &task.StartRequest{          ID: s.ID(),      })    if err != nil {        return errdefs.FromGRPC(err)      }    s.taskPid = int(response.Pid)     return nil


“Containerd-shim启动流程


containerd/runtime/v2/shim/shim.go 中

RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts)

containerd-shim-runc-v2 start 的代码入口:

case "start":    opts := StartOpts{      Address:      addressFlag,      TTRPCAddress: ttrpcAddress,      Debug:        debugFlag,    }
address, err := manager.Start(ctx, id, opts) if err != nil { return err } if _, err := os.Stdout.WriteString(address); err != nil { return err } return nil }

containerd-shim-runc-v2 start进程会再次创建一个containerd-shim-runc-v2 -namespace xxxx -id xxxx - address xxxx 的进程用于启动shim server。

func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) {  cmd, err := newCommand(ctx, id, opts.Address, opts.TTRPCAddress, opts.Debug)  ...  // make sure that reexec shim-v2 binary use the value if need  if err := shim.WriteAddress("address", address); err != nil {    return "", err  }  ...  if err := cmd.Start(); err != nil {    f.Close()    return "", err  }  ...  // make sure to wait after start  go cmd.Wait()  ...    server, err := newServer(ttrpc.WithUnaryServerInterceptor(unaryInterceptor))  if err != nil {    return fmt.Errorf("failed creating server: %w", err)  }
for _, srv := range ttrpcServices { if err := srv.RegisterTTRPC(server); err != nil { return fmt.Errorf("failed to register service: %w", err) } }
if err := serve(ctx, server, signals, sd.Shutdown); err != nil { if err != shutdown.ErrShutdown { return err } } }

shim server是个ttrpc服务,提供如下接口:

type TaskService interface {  State(context.Context, *StateRequest) (*StateResponse, error)  Create(context.Context, *CreateTaskRequest) (*CreateTaskResponse, error)  Start(context.Context, *StartRequest) (*StartResponse, error)  Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)  Pids(context.Context, *PidsRequest) (*PidsResponse, error)  Pause(context.Context, *PauseRequest) (*emptypb.Empty, error)  Resume(context.Context, *ResumeRequest) (*emptypb.Empty, error)  Checkpoint(context.Context, *CheckpointTaskRequest) (*emptypb.Empty, error)  Kill(context.Context, *KillRequest) (*emptypb.Empty, error)  Exec(context.Context, *ExecProcessRequest) (*emptypb.Empty, error)  ResizePty(context.Context, *ResizePtyRequest) (*emptypb.Empty, error)  CloseIO(context.Context, *CloseIORequest) (*emptypb.Empty, error)  Update(context.Context, *UpdateTaskRequest) (*emptypb.Empty, error)  Wait(context.Context, *WaitRequest) (*WaitResponse, error)  Stats(context.Context, *StatsRequest) (*StatsResponse, error)  Connect(context.Context, *ConnectRequest) (*ConnectResponse, error)  Shutdown(context.Context, *ShutdownRequest) (*emptypb.Empty, error)}

创建task是执行了runc create --bundle xxxx xxxx 命令,参考代码:

func (r *Runc) Create(context context.Context, id, bundle string, opts *CreateOpts) error{    args := []string{"create", "­­bundle", bundle}    ......    cmd := r.command(context, append(args, id)...)    ......    ec, err := Monitor.Start(cmd)    ......}

启动task是执行了runc start xxxx 命令,参考代码:

func (r *Runc) Start(context context.Context, id string) error {     return r.runOrError(r.command(context, "start", id))}

小结

kubelet创建sandbox流程总结如下:

① containerd的cri模块创建sandbox元数据并保存 
② containerd的cri模块创建sandbox容器并保存 
③ containerd的cri模块通过grpc调用tasks-service创建task
④ tasks-service模块创建containerd-shim-xxxx-xxxx start 进程 
⑤ containerd-shim-xxxx-xxxx start 进程创建containerd-shim- xxxx-xxxx 进程并退出 
⑥ containerd-shim-xxxx-xxxx 进程启动shim server,提供ttrpc服务
⑦ tasks-service模块调用shim server的Create接口,创建task,shim server 执行runc create 命令
⑧ containerd的cri模块通过grpc调用tasks-service启动task 
tasks-service模块调用shimserverStart接口,启动taskshim server 执行runc start命令




👇🏻 真诚推荐你关注👇🏻


DCOS
CNCF 云原生基金会大使,CoreDNS 开源项目维护者。主要分享云原生技术、云原生架构、容器、函数计算等方面的内容,包括但不限于 Kubernetes,Containerd、CoreDNS、Service Mesh,Istio等等