1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
kube_docker_client.go 20.78 KB
一键复制 编辑 原始数据 按行查看 历史
Davanum Srinivas 提交于 2018-11-09 13:49 . Move from glog to klog
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package libdocker
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"regexp"
"sync"
"time"
"k8s.io/klog"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
dockerimagetypes "github.com/docker/docker/api/types/image"
dockerapi "github.com/docker/docker/client"
dockermessage "github.com/docker/docker/pkg/jsonmessage"
dockerstdcopy "github.com/docker/docker/pkg/stdcopy"
)
// kubeDockerClient is a wrapped layer of docker client for kubelet internal use. This layer is added to:
// 1) Redirect stream for exec and attach operations.
// 2) Wrap the context in this layer to make the Interface cleaner.
type kubeDockerClient struct {
// timeout is the timeout of short running docker operations.
timeout time.Duration
// If no pulling progress is made before imagePullProgressDeadline, the image pulling will be cancelled.
// Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
// between progress updates.
imagePullProgressDeadline time.Duration
client *dockerapi.Client
}
// Make sure that kubeDockerClient implemented the Interface.
var _ Interface = &kubeDockerClient{}
// There are 2 kinds of docker operations categorized by running time:
// * Long running operation: The long running operation could run for arbitrary long time, and the running time
// usually depends on some uncontrollable factors. These operations include: PullImage, Logs, StartExec, AttachToContainer.
// * Non-long running operation: Given the maximum load of the system, the non-long running operation should finish
// in expected and usually short time. These include all other operations.
// kubeDockerClient only applies timeout on non-long running operations.
const (
// defaultTimeout is the default timeout of short running docker operations.
// Value is slightly offset from 2 minutes to make timeouts due to this
// constant recognizable.
defaultTimeout = 2*time.Minute - 1*time.Second
// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
defaultShmSize = int64(1024 * 1024 * 64)
// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
defaultImagePullingProgressReportInterval = 10 * time.Second
)
// newKubeDockerClient creates an kubeDockerClient from an existing docker client. If requestTimeout is 0,
// defaultTimeout will be applied.
func newKubeDockerClient(dockerClient *dockerapi.Client, requestTimeout, imagePullProgressDeadline time.Duration) Interface {
if requestTimeout == 0 {
requestTimeout = defaultTimeout
}
k := &kubeDockerClient{
client: dockerClient,
timeout: requestTimeout,
imagePullProgressDeadline: imagePullProgressDeadline,
}
// Notice that this assumes that docker is running before kubelet is started.
v, err := k.Version()
if err != nil {
klog.Errorf("failed to retrieve docker version: %v", err)
klog.Warningf("Using empty version for docker client, this may sometimes cause compatibility issue.")
} else {
// Update client version with real api version.
dockerClient.NegotiateAPIVersionPing(dockertypes.Ping{APIVersion: v.APIVersion})
}
return k
}
func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
containers, err := d.client.ContainerList(ctx, options)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return containers, nil
}
func (d *kubeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
containerJSON, err := d.client.ContainerInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &containerJSON, nil
}
// InspectContainerWithSize is currently only used for Windows container stats
func (d *kubeDockerClient) InspectContainerWithSize(id string) (*dockertypes.ContainerJSON, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
// Inspects the container including the fields SizeRw and SizeRootFs.
containerJSON, _, err := d.client.ContainerInspectWithRaw(ctx, id, true)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &containerJSON, nil
}
func (d *kubeDockerClient) CreateContainer(opts dockertypes.ContainerCreateConfig) (*dockercontainer.ContainerCreateCreatedBody, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
// we provide an explicit default shm size as to not depend on docker daemon.
// TODO: evaluate exposing this as a knob in the API
if opts.HostConfig != nil && opts.HostConfig.ShmSize <= 0 {
opts.HostConfig.ShmSize = defaultShmSize
}
createResp, err := d.client.ContainerCreate(ctx, opts.Config, opts.HostConfig, opts.NetworkingConfig, opts.Name)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &createResp, nil
}
func (d *kubeDockerClient) StartContainer(id string) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
// Stopping an already stopped container will not cause an error in dockerapi.
func (d *kubeDockerClient) StopContainer(id string, timeout time.Duration) error {
ctx, cancel := d.getCustomTimeoutContext(timeout)
defer cancel()
err := d.client.ContainerStop(ctx, id, &timeout)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerRemove(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) UpdateContainerResources(id string, updateConfig dockercontainer.UpdateConfig) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
_, err := d.client.ContainerUpdate(ctx, id, updateConfig)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
func (d *kubeDockerClient) inspectImageRaw(ref string) (*dockertypes.ImageInspect, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, _, err := d.client.ImageInspectWithRaw(ctx, ref)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
if dockerapi.IsErrNotFound(err) {
err = ImageNotFoundError{ID: ref}
}
return nil, err
}
return &resp, nil
}
func (d *kubeDockerClient) InspectImageByID(imageID string) (*dockertypes.ImageInspect, error) {
resp, err := d.inspectImageRaw(imageID)
if err != nil {
return nil, err
}
if !matchImageIDOnly(*resp, imageID) {
return nil, ImageNotFoundError{ID: imageID}
}
return resp, nil
}
func (d *kubeDockerClient) InspectImageByRef(imageRef string) (*dockertypes.ImageInspect, error) {
resp, err := d.inspectImageRaw(imageRef)
if err != nil {
return nil, err
}
if !matchImageTagOrSHA(*resp, imageRef) {
return nil, ImageNotFoundError{ID: imageRef}
}
return resp, nil
}
func (d *kubeDockerClient) ImageHistory(id string) ([]dockerimagetypes.HistoryResponseItem, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ImageHistory(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
return resp, err
}
func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.ImageSummary, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
images, err := d.client.ImageList(ctx, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return images, nil
}
func base64EncodeAuth(auth dockertypes.AuthConfig) (string, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(auth); err != nil {
return "", err
}
return base64.URLEncoding.EncodeToString(buf.Bytes()), nil
}
// progress is a wrapper of dockermessage.JSONMessage with a lock protecting it.
type progress struct {
sync.RWMutex
// message stores the latest docker json message.
message *dockermessage.JSONMessage
// timestamp of the latest update.
timestamp time.Time
}
func newProgress() *progress {
return &progress{timestamp: time.Now()}
}
func (p *progress) set(msg *dockermessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.message = msg
p.timestamp = time.Now()
}
func (p *progress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.message == nil {
return "No progress", p.timestamp
}
// The following code is based on JSONMessage.Display
var prefix string
if p.message.ID != "" {
prefix = fmt.Sprintf("%s: ", p.message.ID)
}
if p.message.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp
}
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp
}
// progressReporter keeps the newest image pulling progress and periodically report the newest progress.
type progressReporter struct {
*progress
image string
cancel context.CancelFunc
stopCh chan struct{}
imagePullProgressDeadline time.Duration
}
// newProgressReporter creates a new progressReporter for specific image with specified reporting interval
func newProgressReporter(image string, cancel context.CancelFunc, imagePullProgressDeadline time.Duration) *progressReporter {
return &progressReporter{
progress: newProgress(),
image: image,
cancel: cancel,
stopCh: make(chan struct{}),
imagePullProgressDeadline: imagePullProgressDeadline,
}
}
// start starts the progressReporter
func (p *progressReporter) start() {
go func() {
ticker := time.NewTicker(defaultImagePullingProgressReportInterval)
defer ticker.Stop()
for {
// TODO(random-liu): Report as events.
select {
case <-ticker.C:
progress, timestamp := p.progress.get()
// If there is no progress for p.imagePullProgressDeadline, cancel the operation.
if time.Since(timestamp) > p.imagePullProgressDeadline {
klog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, p.imagePullProgressDeadline, progress)
p.cancel()
return
}
klog.V(2).Infof("Pulling image %q: %q", p.image, progress)
case <-p.stopCh:
progress, _ := p.progress.get()
klog.V(2).Infof("Stop pulling image %q: %q", p.image, progress)
return
}
}
}()
}
// stop stops the progressReporter
func (p *progressReporter) stop() {
close(p.stopCh)
}
func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error {
// RegistryAuth is the base64 encoded credentials for the registry
base64Auth, err := base64EncodeAuth(auth)
if err != nil {
return err
}
opts.RegistryAuth = base64Auth
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ImagePull(ctx, image, opts)
if err != nil {
return err
}
defer resp.Close()
reporter := newProgressReporter(image, cancel, d.imagePullProgressDeadline)
reporter.start()
defer reporter.stop()
decoder := json.NewDecoder(resp)
for {
var msg dockermessage.JSONMessage
err := decoder.Decode(&msg)
if err == io.EOF {
break
}
if err != nil {
return err
}
if msg.Error != nil {
return msg.Error
}
reporter.set(&msg)
}
return nil
}
func (d *kubeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ImageRemove(ctx, image, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if isImageNotFoundError(err) {
return nil, ImageNotFoundError{ID: image}
}
return resp, err
}
func (d *kubeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ContainerLogs(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
defer resp.Close()
return d.redirectResponseToOutputStream(sopts.RawTerminal, sopts.OutputStream, sopts.ErrorStream, resp)
}
func (d *kubeDockerClient) Version() (*dockertypes.Version, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ServerVersion(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &resp, nil
}
func (d *kubeDockerClient) Info() (*dockertypes.Info, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.Info(ctx)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &resp, nil
}
// TODO(random-liu): Add unit test for exec and attach functions, just like what go-dockerclient did.
func (d *kubeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*dockertypes.IDResponse, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ContainerExecCreate(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &resp, nil
}
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
if opts.Detach {
err := d.client.ContainerExecStart(ctx, startExec, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{
Detach: opts.Detach,
Tty: opts.Tty,
})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
defer resp.Close()
if sopts.ExecStarted != nil {
// Send a message to the channel indicating that the exec has started. This is needed so
// interactive execs can handle resizing correctly - the request to resize the TTY has to happen
// after the call to d.client.ContainerExecAttach, and because d.holdHijackedConnection below
// blocks, we use sopts.ExecStarted to signal the caller that it's ok to resize.
sopts.ExecStarted <- struct{}{}
}
return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
}
func (d *kubeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecInspect, error) {
ctx, cancel := d.getTimeoutContext()
defer cancel()
resp, err := d.client.ContainerExecInspect(ctx, id)
if ctxErr := contextError(ctx); ctxErr != nil {
return nil, ctxErr
}
if err != nil {
return nil, err
}
return &resp, nil
}
func (d *kubeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
resp, err := d.client.ContainerAttach(ctx, id, opts)
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
if err != nil {
return err
}
defer resp.Close()
return d.holdHijackedConnection(sopts.RawTerminal, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp)
}
func (d *kubeDockerClient) ResizeExecTTY(id string, height, width uint) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
return d.client.ContainerExecResize(ctx, id, dockertypes.ResizeOptions{
Height: height,
Width: width,
})
}
func (d *kubeDockerClient) ResizeContainerTTY(id string, height, width uint) error {
ctx, cancel := d.getCancelableContext()
defer cancel()
return d.client.ContainerResize(ctx, id, dockertypes.ResizeOptions{
Height: height,
Width: width,
})
}
// GetContainerStats is currently only used for Windows container stats
func (d *kubeDockerClient) GetContainerStats(id string) (*dockertypes.StatsJSON, error) {
ctx, cancel := d.getCancelableContext()
defer cancel()
response, err := d.client.ContainerStats(ctx, id, false)
if err != nil {
return nil, err
}
dec := json.NewDecoder(response.Body)
var stats dockertypes.StatsJSON
err = dec.Decode(&stats)
if err != nil {
return nil, err
}
defer response.Body.Close()
return &stats, nil
}
// redirectResponseToOutputStream redirect the response stream to stdout and stderr. When tty is true, all stream will
// only be redirected to stdout.
func (d *kubeDockerClient) redirectResponseToOutputStream(tty bool, outputStream, errorStream io.Writer, resp io.Reader) error {
if outputStream == nil {
outputStream = ioutil.Discard
}
if errorStream == nil {
errorStream = ioutil.Discard
}
var err error
if tty {
_, err = io.Copy(outputStream, resp)
} else {
_, err = dockerstdcopy.StdCopy(outputStream, errorStream, resp)
}
return err
}
// holdHijackedConnection hold the HijackedResponse, redirect the inputStream to the connection, and redirect the response
// stream to stdout and stderr. NOTE: If needed, we could also add context in this function.
func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error {
receiveStdout := make(chan error)
if outputStream != nil || errorStream != nil {
go func() {
receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader)
}()
}
stdinDone := make(chan struct{})
go func() {
if inputStream != nil {
io.Copy(resp.Conn, inputStream)
}
resp.CloseWrite()
close(stdinDone)
}()
select {
case err := <-receiveStdout:
return err
case <-stdinDone:
if outputStream != nil || errorStream != nil {
return <-receiveStdout
}
}
return nil
}
// getCancelableContext returns a new cancelable context. For long running requests without timeout, we use cancelable
// context to avoid potential resource leak, although the current implementation shouldn't leak resource.
func (d *kubeDockerClient) getCancelableContext() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
// getTimeoutContext returns a new context with default request timeout
func (d *kubeDockerClient) getTimeoutContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), d.timeout)
}
// getCustomTimeoutContext returns a new context with a specific request timeout
func (d *kubeDockerClient) getCustomTimeoutContext(timeout time.Duration) (context.Context, context.CancelFunc) {
// Pick the larger of the two
if d.timeout > timeout {
timeout = d.timeout
}
return context.WithTimeout(context.Background(), timeout)
}
// contextError checks the context, and returns error if the context is timeout.
func contextError(ctx context.Context) error {
if ctx.Err() == context.DeadlineExceeded {
return operationTimeout{err: ctx.Err()}
}
return ctx.Err()
}
// StreamOptions are the options used to configure the stream redirection
type StreamOptions struct {
RawTerminal bool
InputStream io.Reader
OutputStream io.Writer
ErrorStream io.Writer
ExecStarted chan struct{}
}
// operationTimeout is the error returned when the docker operations are timeout.
type operationTimeout struct {
err error
}
func (e operationTimeout) Error() string {
return fmt.Sprintf("operation timeout: %v", e.err)
}
// containerNotFoundErrorRegx is the regexp of container not found error message.
var containerNotFoundErrorRegx = regexp.MustCompile(`No such container: [0-9a-z]+`)
// IsContainerNotFoundError checks whether the error is container not found error.
func IsContainerNotFoundError(err error) bool {
return containerNotFoundErrorRegx.MatchString(err.Error())
}
// ImageNotFoundError is the error returned by InspectImage when image not found.
// Expose this to inject error in dockershim for testing.
type ImageNotFoundError struct {
ID string
}
func (e ImageNotFoundError) Error() string {
return fmt.Sprintf("no such image: %q", e.ID)
}
// IsImageNotFoundError checks whether the error is image not found error. This is exposed
// to share with dockershim.
func IsImageNotFoundError(err error) bool {
_, ok := err.(ImageNotFoundError)
return ok
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.13.4

搜索帮助