图解kubernetes命令执行核心实现

仔仔 · · 615 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

K8s中的命令执行由apiserver、kubelet、cri、docker等组件共同完成, 其中最复杂的就是协议切换以及各种流拷贝相关,让我们一起来看下关键实现,虽然代码比较多,但是不会开发应该也能看懂,祝你好运

1. 基础概念

K8s中的命令执行中有很多协议相关的处理, 我们先一起看下这些协议处理相关的基础概念

1.1 Http协议中的Connection与Upgrade

image.png

HTTP/1.1中允许在同一个链接上通过Header头中的Connection配合Upgrade来实现协议的转换,简单来说就是允许在通过HTTP建立的链接之上使用其他的协议来进行通信,这也是k8s中命令中实现协议升级的关键

1.2 Http协议中的101状态码

image.png

在HTTP协议中除了我们常见的HTTP1.1,还支持websocket/spdy等协议,那服务端和客户端如何在http之上完成不同协议的切换呢,首先第一个要素就是这里的101(Switching Protocal)状态码, 即服务端告知客户端我们切换到Uprage定义的协议上来进行通信(复用当前链接)

1.3 SPDY协议中的stream

image.png

SPDY协议是google开发的TCP会话层协议, SPDY协议中将Http的Request/Response称为Stream,并支持TCP的链接复用,同时多个stream之间通过Stream-id来进行标记,简单来说就是支持在单个链接同时进行多个请求响应的处理,并且互不影响,k8s中的命令执行主要也就是通过stream来进行消息传递的

1.4 文件描述符重定向

image.png

在Linux中进程执行通常都会包含三个FD:标准输入、标准输出、标准错误, k8s中的命令执行会将对应的FD进行重定向,从而获取容器的命令的输出,重定向到哪呢?当然是我们上面提到过的stream了(因为对docker并不熟悉,所以这个地方并不保证Docker部分的准确性)

1.5 http中的Hijacker

image.png

在client与server之间通过101状态码、connection、upragde等完成基于当前链接的转换之后, 当前链接上传输的数据就不在是之前的http1.1协议了,此时就要将对应的http链接转成对应的协议进行转换,在k8s命令执行的过程中,会获取将对应的request和response,都通过http的Hijacker接口获取底层的tcp链接,从而继续完成请求的转发

1.6 基于tcp的流对拷的转发

在通过Hijacker获取到两个底层的tcp的readerwriter之后,就可以直接通过io.copy在两个流上完成对应数据的拷贝,这样就不需要在apiserver这个地方进行协议的转换,而是直接通过tcp的流对拷就可以实现请求和结果的转发

基础大概就介绍这些,接下来我们一起去看看其底层的具体实现,我们从kubectl部分开始来逐层分析

2.kubectl

Kubectl执行命令主要分为两部分Pod合法性检测和命令执行, Pod合法性检测主要是获取对应Pod的状态,检测是否在运行, 这里我们重点关注下命令执行部分

2.1 命令执行核心流程

image.png

命令执行的核心分为两个步骤:1.通过SPDY协议建立链接 2)构建Stream建立链接

func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
    exec, err := remotecommand.NewSPDYExecutor(config, method, url)
    if err != nil {
        return err
    }
    return exec.Stream(remotecommand.StreamOptions{
        Stdin:             stdin,
        Stdout:            stdout,
        Stderr:            stderr,
        Tty:               tty,
        TerminalSizeQueue: terminalSizeQueue,
    })
}

2.2 exec请求构建

我们可以看到这个地方拼接的Url /pods/{namespace}/{podName}/exec其实就是对应apiserver上面pod的subresource接口,然后我们就可以去看apiserver端的请求处理了

    // 创建一个exec
        req := restClient.Post().
            Resource("pods").
            Name(pod.Name).
            Namespace(pod.Namespace).
            SubResource("exec")
        req.VersionedParams(&corev1.PodExecOptions{
            Container: containerName,
            Command:   p.Command,
            Stdin:     p.Stdin,
            Stdout:    p.Out != nil,
            Stderr:    p.ErrOut != nil,
            TTY:       t.Raw,
        }, scheme.ParameterCodec)
return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)

2.3 建立Stream

image.png

在exec.Stream主要是通过Headers传递要建立的Stream的类型,与server端进行协商

    // set up stdin stream
    if p.Stdin != nil {
        headers.Set(v1.StreamType, v1.StreamTypeStdin)
        p.remoteStdin, err = conn.CreateStream(headers)
        if err != nil {
            return err
        }
    }

    // set up stdout stream
    if p.Stdout != nil {
        headers.Set(v1.StreamType, v1.StreamTypeStdout)
        p.remoteStdout, err = conn.CreateStream(headers)
        if err != nil {
            return err
        }
    }

    // set up stderr stream
    if p.Stderr != nil && !p.Tty {
        headers.Set(v1.StreamType, v1.StreamTypeStderr)
        p.remoteStderr, err = conn.CreateStream(headers)
        if err != nil {
            return err
        }
    }

3.APIServer

APIServer在命令执行的过程中扮演了代理的角色,其负责将Kubectl和kubelet之间的请求来进行转发,注意这个转发主要是基于tcp的流对拷完成的,因为kubectl和kubelet之间的通信,实际上是spdy协议,让我们一起看下关键实现吧

3.1 Connection

image.png

Exec的SPDY请求会首先发送到Connect接口, Connection接口负责跟后端的kubelet进行链接的建立,并且进行响应结果的返回,在Connection接口中,首先会通过Pod获取到对应的Node信息,并且构建Location即后端的Kubelet的链接地址和transport

func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
    execOpts, ok := opts.(*api.PodExecOptions)
    if !ok {
        return nil, fmt.Errorf("invalid options object: %#v", opts)
    }
    // 返回对应的地址,以及建立链接
    location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
    if err != nil {
        return nil, err
    }
    return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

3.2 获取后端服务地址

在获取地址主要是构建后端的location信息,这里会通过kubelet上报来的信息获取到对应的node的host和Port信息,并且拼装出pod的最终指向路径即这里的Path字段/exec/{namespace}/{podName}/{containerName}

    loc := &url.URL{
        Scheme:   nodeInfo.Scheme,
        Host:     net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),    // node的端口
        Path:     fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),    // 路径
        RawQuery: params.Encode(),
    }

3.3 协议提升handler初始化

协议提升主要是通过UpgradeAwareHandler控制器进行实现, 该handler接收到请求之后会首先尝试进行协议提升,其主要是检测http头里面的Connection的值是不是Upragde来实现, 从之前kubelet的分析中可以知道这里肯定是true

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler {

    handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
    handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects)
    handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects)
    handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
    return handler
}

func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    // 如果协议提升成功,则由该协议完成
    if h.tryUpgrade(w, req) {
        return
    }
    // 省略N多代码
}

3.4 协议提升处理

image.png

协议提升处理的逻辑比较多,这里分为几个小节来进行依次说明, 主要是先从HTTP链接中获取请求,并进行转发,然后同时持有两个链接,并且在链接上进行TCP流的拷贝

3.4.1 与kubelet建立链接

协议提升的第一步就是与后端的kubelet建立链接了,这里会将kubelet发过来的请求进行拷贝,并且发送给后端的kubelet, 并且这里也会获取到一个与kubelet建立的http的链接,后面进行流对拷的时候需要用到, 注意实际上这个http请求响应的状态码,是101,即kubelet上实际上是构建了一个spdy协议的handler来进行通信的

        // 构建http请求
        req, err := http.NewRequest(method, location.String(), body)
        if err != nil {
            return nil, nil, err
        }

        req.Header = header

        // 发送请求建立链接
        intermediateConn, err = dialer.Dial(req)
        if err != nil {
            return nil, nil, err
        }

        // Peek at the backend response.
        rawResponse.Reset()
        respReader := bufio.NewReader(io.TeeReader(
            io.LimitReader(intermediateConn, maxResponseSize), // Don't read more than maxResponseSize bytes.
            rawResponse)) // Save the raw response.
            // 读取响应信息
        resp, err := http.ReadResponse(respReader, nil)

3.4.2 Request请求的Hijack

这个请求实际上是spdy协议的,在通过Hijack获取到底层的链接之后,需要先将上面的请求转发给kubelet从而触发kubelet发送后面的Stream请求建立链接,就是这里的Write将kubelet的结果转发

    requestHijackedConn, _, err := requestHijacker.Hijack()
    // Forward raw response bytes back to client.
    if len(rawResponse) > 0 {
        klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
        if _, err = requestHijackedConn.Write(rawResponse); err != nil {
            utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
        }
    }

3.4.3 双向流对拷

经过上面的两步操作,apiserver上就拥有来了两个http链接,因为协议不是http的所以apiserver不能直接进行操作,而只能采用流对拷的方式来进行请求和响应的转发

    // 双向拷贝链接
    go func() {
        var writer io.WriteCloser
        if h.MaxBytesPerSec > 0 {
            writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
        } else {
            writer = backendConn
        }
        _, err := io.Copy(writer, requestHijackedConn)
        if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
            klog.Errorf("Error proxying data from client to backend: %v", err)
        }
        close(writerComplete)
    }()

    go func() {
        var reader io.ReadCloser
        if h.MaxBytesPerSec > 0 {
            reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
        } else {
            reader = backendConn
        }
        _, err := io.Copy(requestHijackedConn, reader)
        if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
            klog.Errorf("Error proxying data from backend to client: %v", err)
        }
        close(readerComplete)
    }()

4.kubelet

Kubelet上的命令执行主要是依赖于CRI.RuntimeService来执行的,kubelet只负责对应请求的转发,并最终构建一个转发后续请求的Stream代理,就完成了他的使命

4.1 执行命令主流程

主流程主要是获取要执行的命令,然后检测对应的Pod新,并调用host.GetExec返回一个对应的URL,然后后续的请求就由proxyStream来完成, 我们一步步开始深入

func (s *Server) getExec(request *restful.Request, response *restful.Response) {
    // 获取执行命令
    params := getExecRequestParams(request)
    streamOpts, err := remotecommandserver.NewOptions(request.Request)
    // 获取pod的信息
    pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
    podFullName := kubecontainer.GetPodFullName(pod)
    url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
    proxyStream(response.ResponseWriter, request.Request, url)
}

4.2 Exec返回执行结果

host.GetExec最终会调用到runtimeService即cri.RuntimeService的Exec接口来进行请求的执行,该接口会返回一个地址即/exec/{token},此时并没有执行真正的命令只是创建了一个命令执行请求而已

func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
    // 省略请求构造
    // 执行命令
    resp, err := m.runtimeService.Exec(req)
    return url.Parse(resp.Url)
}

最终其实就是调用cri的的exec接口, 我们先忽略该接口具体返回的啥,将kubelet剩余的逻辑看完

func (c *runtimeServiceClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) {
    err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/Exec", in, out, opts...)
}

4.3 proxyStream

image.png

这里我们可以发现,又是我们之前见过的UpgradeAwareHandler,不过这次的url是后端exec执行返回的url了,然后剩下部分就跟apiserver里面的差不多,在两个http链接之间进行流对拷

我们想一下这个地方Request和Response,其实是对应的apiserver与kubelet建立的链接,这个链接上是spdy的头,记住这个地方, 则此时又跟后端继续建立链接,后端其实也是一个spdy协议的server, 至此我们还差最后一个部分就是返回的那个链接到底是啥,对应的控制器又是谁,进行下一节cri部分

// proxyStream proxies stream to url.
func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) {
    // TODO(random-liu): Set MaxBytesPerSec to throttle the stream.
    handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{})
    handler.ServeHTTP(w, r)
}

5.CRI

CRI.RuntimeService负责最终的命令执行,也是命令执行真正执行的位置,其中也涉及到很多的协议处理相关的操作,让我们一起来看下关键实现吧

5.1 DockerRuntime的注册

在上面我们调用了RuntimeService的Exec接口,在kubelet中最终发现如下代码,创建了一个DockerServer并启动

ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
        if err := dockerServer.Start(); err != nil {
            return err
        }

其中在Start函数里面,注册了下面两个RuntimeService,写过grpc的朋友都知道,这个其实就是注册对应rpc接口的实现,其实最终我们调用的是DockerService的接口

    runtimeapi.RegisterRuntimeServiceServer(s.server, s.service)
    runtimeapi.RegisterImageServiceServer(s.server, s.service)

5.2 DockerService的Exec实现

Exec最终的实现可以发现实际上是调用streamingServer的GetExec接口,返回了一个/exec/{token}的接口

func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
    // 执行Exec请求
    return ds.streamingServer.GetExec(req)
}

我们继续追踪streamingServer可以看到GetExec接口实现如下, 最终build了一个url=/exec/{token},注意这里实际上存储了当前的Request请求在cache中

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
    // 生成token
    token, err := s.cache.Insert(req)
    return &runtimeapi.ExecResponse{
        Url: s.buildURL("exec", token),
    }, nil
}

5.3 构建命令参数执行Exec

首先通过token来获取之前缓存的Request,然后通过exec请求命令,构建StreamOpts,并最终调用ServeExec进行执行,接下来就是最不容易看懂的部分了,前方高能

func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
    // 获取token
    token := req.PathParameter("token")
    // 缓存请求
    cachedRequest, ok := s.cache.Consume(token)
    // 构建exec参数s
    exec, ok := cachedRequest.(*runtimeapi.ExecRequest)

    streamOpts := &remotecommandserver.Options{
        Stdin:  exec.Stdin,
        Stdout: exec.Stdout,
        Stderr: exec.Stderr,
        TTY:    exec.Tty,
    }

    // 构建ServerExec执行请求
    remotecommandserver.ServeExec(
        resp.ResponseWriter,
        req.Request,
        s.runtime,
        "", // unused: podName
        "", // unusued: podUID
        exec.ContainerId,
        exec.Cmd,
        streamOpts,
        s.config.StreamIdleTimeout,
        s.config.StreamCreationTimeout,
        s.config.SupportedRemoteCommandProtocols)
}

5.4 ServerExec

ServerExec关键步骤就两个:1)创建stream 2)执行请求, 比较复杂的主要是集中在创建stream部分,我们注意下ExecInContainer的参数部分,传入了通过创建流获取的ctx的相关文件描述符的Stream, createStreams里面的实现有两种协议websocket和https,这里我们主要分析https(我们使用kubectl使用的就是https协议)

func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
    // 创建serveExec
    ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)

    defer ctx.conn.Close()

    // 获取执行,这是一个阻塞的过程,err会获取当前的执行是否成功, 这里将ctx里面的信息,都传入进去,对应的其实就是各种流
    err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)

}

5.5 创建HTTPS Stream

Stream的建立我将其概括成下面几个步骤:1)进行https的握手 2)协议升级为spdy 3)等待stream的建立,我们依次来看

1.完成https的握手
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
2.协议提升
    // 流管道
    streamCh := make(chan streamAndReply)

    upgrader := spdy.NewResponseUpgrader()
    // 构建spdy链接
    conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
        // 当新请求建立之后,会追加到streamch
        streamCh <- streamAndReply{Stream: stream, replySent: replySent}
        return nil
    })

这里有一个关键机制就是后面func回调函数的传递和streamch的传递,这里建立一个链接之后会创建一个Server,并且传入了一个控制器就是func回调函数,该函数每当建立一个链接之后,如果获取到对应的stream就追加到StreamCh中,下面就是最复杂的网络处理部分了,因为太复杂,所以还是单独开一节吧

5.6 spdy stream的建立

image.png

总体流程上看起来简单,主要是先根据请求来进行协议切换,然后返回101,并且基于当前的链接构建SPDY的请求处理,然后等待kubectl通过apiserver发送的需要建立的Stream,就完成了彼此通信流stream的建立

5.6.1 进行协议提升响应

首先第一步会先进行协议提升的响应,这里我们注意几个关键部分,spdy协议,以及101状态码

    // 协议
    hijacker, ok := w.(http.Hijacker)
    if !ok {
        errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response")
        http.Error(w, errorMsg, http.StatusInternalServerError)
        return nil
    }

    w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
    // sydy协议
    w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31)
    w.WriteHeader(http.StatusSwitchingProtocols)

5.6.2 建立spdyServer

spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)

最终会通过newConnection负责新链接的建立

func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
    // 创建一个新的链接, 通过一个已经存在的网络链接
    spdyConn, err := spdystream.NewConnection(conn, true)

    return newConnection(spdyConn, newStreamHandler), nil
}

这里我们可以看到是启动一个后台的server来进行链接请求的处理

func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
    c := &connection{conn: conn, newStreamHandler: newStreamHandler}
    // 当建立链接后,进行syn请求创建流的时候,会调用newSpdyStream
    go conn.Serve(c.newSpdyStream)
    return c
}

5.6.3 Serve

1.首先会启动多个goroutine来负责请求的处理,这里的worker数量是5个,队列大小是20,
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
    for i := 0; i < FRAME_WORKERS; i++ {
        frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)

        // Ensure frame queue is drained when connection is closed
        go func(frameQueue *PriorityFrameQueue) {
            <-s.closeChan
            frameQueue.Drain()
        }(frameQueues[i])

        wg.Add(1)
        go func(frameQueue *PriorityFrameQueue) {
            // let the WaitGroup know this worker is done
            defer wg.Done()

            s.frameHandler(frameQueue, newHandler)
        }(frameQueues[i])
    }
2.监听synStreamFrame,分流frame,会按照frame的streamID来进行hash选择对应的frameQueues队列
        case *spdy.SynStreamFrame:
            if s.checkStreamFrame(frame) {
                priority = frame.Priority
                partition = int(frame.StreamId % FRAME_WORKERS)
                debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
                // 添加到对应的StreamId对应的frame里面
                s.addStreamFrame(frame)
            } else {
                debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
                continue
            
                // 最终会讲frame push到上面的优先级队列里面
                frameQueues[partition].Push(readFrame, priority)
3.读取frame进行并把读取到的stream通过newHandler传递给上面的StreamCH
func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
    for {
        popFrame := frameQueue.Pop()
        if popFrame == nil {
            return
        }

        var frameErr error
        switch frame := popFrame.(type) {
        case *spdy.SynStreamFrame:
            frameErr = s.handleStreamFrame(frame, newHandler)
    }
}

消费的流到下一节

5.7 等待建立Stream

Stream的等待建立主要是通过Headers里面的StreamType来实现,这里面会讲对应的stdinStream和对应的spdy里面的stream绑定,其他类型也是这样


func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
    ctx := &context{}
    receivedStreams := 0
    replyChan := make(chan struct{})
    stop := make(chan struct{})
    defer close(stop)
WaitForStreams:
    for {
        select {
        case stream := <-streams:
            streamType := stream.Headers().Get(api.StreamType)
            switch streamType {
            case api.StreamTypeError:
                ctx.writeStatus = v1WriteStatusFunc(stream)
                go waitStreamReply(stream.replySent, replyChan, stop)
            case api.StreamTypeStdin:
                ctx.stdinStream = stream
                go waitStreamReply(stream.replySent, replyChan, stop)
            case api.StreamTypeStdout:
                ctx.stdoutStream = stream
                go waitStreamReply(stream.replySent, replyChan, stop)
            case api.StreamTypeStderr:
                ctx.stderrStream = stream
                go waitStreamReply(stream.replySent, replyChan, stop)
            case api.StreamTypeResize:
                ctx.resizeStream = stream
                go waitStreamReply(stream.replySent, replyChan, stop)
            default:
                runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType))
            }
        case <-replyChan:
            receivedStreams++
            if receivedStreams == expectedStreams {
                break WaitForStreams
            }
        case <-expired:
            // TODO find a way to return the error to the user. Maybe use a separate
            // stream to report errors?
            return nil, errors.New("timed out waiting for client to create streams")
        }
    }

    return ctx, nil
}

5.8 CRI适配器执行命令

跟踪调用链最终可以看到如下的调用,最终指向了execHandler.ExecInContainer接口用于在容器中执行命令

func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    // 执行command
    return a.Runtime.Exec(container, cmd, in, out, err, tty, resize)
}
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
    //  执行容器
    return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}

// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    // exechandler
    return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
}

5.9 命令执行主流程

命令的指向的主流程主要分为两个部分:1)创建exec执行任务 2)启动exec执行任务

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    // 在容器中执行命令
    done := make(chan struct{})
    defer close(done)

    // 执行命令
    createOpts := dockertypes.ExecConfig{
        Cmd:          cmd,
        AttachStdin:  stdin != nil,
        AttachStdout: stdout != nil,
        AttachStderr: stderr != nil,
        Tty:          tty,
    }
    // 创建执行命令任务
    execObj, err := client.CreateExec(container.ID, createOpts)

    startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
    // 这里我们可以看到我们将前面获取到的stream的封装,都作为FD传入到容器的执行命令里面去了
    streamOpts := libdocker.StreamOptions{
        InputStream:  stdin,
        OutputStream: stdout,
        ErrorStream:  stderr,
        RawTerminal:  tty,
        ExecStarted:  execStarted,
    }
    // 执行命令
    err = client.StartExec(execObj.ID, startOpts, streamOpts)
    
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    count := 0
    for {
        //  获取执行结果
        inspect, err2 := client.InspectExec(execObj.ID)
        if !inspect.Running {
            if inspect.ExitCode != 0 {
                err = &dockerExitError{inspect}
            }
            break
        }
        <-ticker.C
    }

    return err
}
Docker的命令执行接口调用
func (cli *Client) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) {
    resp, err := cli.post(ctx, "/containers/"+container+"/exec", nil, config, nil)
    return response, err
}

5.10 命令执行核心实现

image.png

命令执行的核心实现主要是两个步骤:1)首先发送exec执行请求 2)启动对应的exec并获取结果, 复杂的还是SPDY相关的Stream的逻辑

func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
    // 启动执行命令, 获取结果
    resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{
        Detach: opts.Detach,
        Tty:    opts.Tty,
    })
    // 将输入流拷贝到输出流, 这里会讲resp里面的结果拷贝到outputSTream里面
    return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
}

5.10.1 命令执行请求

cli.postHijacked(ctx, "/exec/"+execID+"/start", nil, config, headers)

5.10.2 发送请求获取连接

这里的HiHijackConn功能跟之前介绍的类似,其核心也是通过建立http链接,然后进行协议提升,其conn就是底层的tcp链接,同时还给对应的链接设置了Keepliave当前是30s, 到此我们就又有了一个基于spdy双向通信的链接

func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) {
    conn, err := cli.setupHijackConn(ctx, req, "tcp")
    return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err
}

5.10.3 建立流对拷

至此在kubelet上面我们获取到了与后端执行命令的Stream还有与apiserver建立的Stream, 此时就只需要将两个流直接进行拷贝,就可以实现数据的传输了

func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error {
    receiveStdout := make(chan error)
    if outputStream != nil || errorStream != nil {
        // 将响应结果拷贝到outputstream里面
        go func() {
            receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader)
        }()
    }

    stdinDone := make(chan struct{})
    go func() {
        if inputStream != nil {
            io.Copy(resp.Conn, inputStream)
        }
        resp.CloseWrite()
        close(stdinDone)
    }()

    return nil
}

5.10.4 检测执行状态

image.png
在发生完成执行命令以后,会每隔2s钟进行一次执行状态的检查,如果发现执行完成,则就直接退出

func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    count := 0
    for {
        //  获取执行结果
        inspect, err2 := client.InspectExec(execObj.ID)
        if err2 != nil {
            return err2
        }
        if !inspect.Running {
            if inspect.ExitCode != 0 {
                err = &dockerExitError{inspect}
            }
            break
        }
        <-ticker.C
    }

    return err
}

func (cli *Client) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) {
    resp, err := cli.get(ctx, "/exec/"+execID+"/json", nil, nil)
    return response, err
}

6.总结

image.png

整个命令执行的过程其实还是蛮复杂的,主要是在于网络协议切换那部分,我们可以看到其实在整个过程中,都是基于SPDY协议来进行的,并且在CRI.RuntimeService那部分我们也可以看到Stream的请求处理其实也是多goroutine并发的,仰慕一下大牛的设计,有什么写的不对的地方,欢迎一起讨论,谢谢大佬们能看到这里

kubernetes学习笔记地址: https://www.yuque.com/baxiaos...

微信号:baxiaoshi2020
关注公告号阅读更多源码分析文章 图解源码
本文由博客一文多发平台 OpenWrite 发布!

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:仔仔

查看原文:图解kubernetes命令执行核心实现

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

615 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传