1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
kubelet.go 81.37 KB
一键复制 编辑 原始数据 按行查看 历史
Mike Danese 提交于 2015-08-17 12:56 . run gofmt on everything we touched
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
// Note: if you change code in this file, you might need to change code in
// contrib/mesos/pkg/executor/.
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/rkt"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
utilErrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/third_party/golang/expansion"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time.Minute
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// Location of container logs.
containerLogsDir = "/var/log/containers"
)
var (
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
)
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occurring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType, mirrorPods map[string]*api.Pod,
startTime time.Time) error
}
type SourcesReadyFn func() bool
// Wait for the container runtime to be up with a timeout.
func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error {
var err error = nil
waitStart := time.Now()
for time.Since(waitStart) < timeout {
_, err = cr.Version()
if err == nil {
return nil
}
time.Sleep(100 * time.Millisecond)
}
return err
}
// New creates a new Kubelet for use in main
func NewMainKubelet(
hostname string,
nodeName string,
dockerClient dockertools.DockerInterface,
kubeClient client.Interface,
rootDirectory string,
podInfraContainerImage string,
resyncInterval time.Duration,
pullQPS float32,
pullBurst int,
containerGCPolicy ContainerGCPolicy,
sourcesReady SourcesReadyFn,
registerNode bool,
standaloneMode bool,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
volumePlugins []volume.VolumePlugin,
networkPlugins []network.NetworkPlugin,
networkPluginName string,
streamingConnectionIdleTimeout time.Duration,
recorder record.EventRecorder,
cadvisorInterface cadvisor.Interface,
imageGCPolicy ImageGCPolicy,
diskSpacePolicy DiskSpacePolicy,
cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration,
resourceContainer string,
osInterface kubecontainer.OSInterface,
cgroupRoot string,
containerRuntime string,
mounter mount.Interface,
dockerDaemonContainer string,
systemContainer string,
configureCBR0 bool,
podCIDR string,
pods int,
dockerExecHandler dockertools.ExecHandler) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
}
if systemContainer != "" && cgroupRoot == "" {
return nil, fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
}
dockerClient = dockertools.NewInstrumentedDockerInterface(dockerClient)
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Services(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
}
serviceLister := &cache.StoreToServiceLister{serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.Nodes().List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{nodeStore}
// TODO: get the real minion object of ourself,
// and use the real minion name and UID.
// TODO: what is namespace for node?
nodeRef := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
containerGC, err := newContainerGC(dockerClient, containerGCPolicy)
if err != nil {
return nil, err
}
imageManager, err := newImageManager(dockerClient, cadvisorInterface, recorder, nodeRef, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
diskSpaceManager, err := newDiskSpaceManager(cadvisorInterface, diskSpacePolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
statusManager := newStatusManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
volumeManager := newVolumeManager()
oomWatcher := NewOOMWatcher(cadvisorInterface, recorder)
klet := &Kubelet{
hostname: hostname,
nodeName: nodeName,
dockerClient: dockerClient,
kubeClient: kubeClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
standaloneMode: standaloneMode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeLister: nodeLister,
runtimeMutex: sync.Mutex{},
runtimeUpThreshold: maxWaitForContainerRuntime,
lastTimestampRuntimeUp: time.Time{},
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder,
cadvisor: cadvisorInterface,
containerGC: containerGC,
imageManager: imageManager,
diskSpaceManager: diskSpaceManager,
statusManager: statusManager,
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
oomWatcher: oomWatcher,
cgroupRoot: cgroupRoot,
mounter: mounter,
configureCBR0: configureCBR0,
podCIDR: podCIDR,
pods: pods,
syncLoopMonitor: util.AtomicValue{},
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
// Initialize the runtime.
switch containerRuntime {
case "docker":
// Only supported one for now, continue.
klet.containerRuntime = dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
containerRefManager,
podInfraContainerImage,
pullQPS,
pullBurst,
containerLogsDir,
osInterface,
klet.networkPlugin,
klet,
klet.httpClient,
newKubeletRuntimeHooks(recorder),
dockerExecHandler)
case "rkt":
conf := &rkt.Config{InsecureSkipVerify: true}
rktRuntime, err := rkt.New(
conf,
klet,
recorder,
containerRefManager,
readinessManager,
klet.volumeManager)
if err != nil {
return nil, err
}
klet.containerRuntime = rktRuntime
// No Docker daemon to put in a container.
dockerDaemonContainer = ""
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}
// Setup container manager, can fail if the devices hierarchy is not mounted
// (it is required by Docker however).
containerManager, err := newContainerManager(cadvisorInterface, dockerDaemonContainer, systemContainer, resourceContainer)
if err != nil {
return nil, fmt.Errorf("failed to create the Container Manager: %v", err)
}
klet.containerManager = containerManager
go util.Until(klet.syncNetworkStatus, 30*time.Second, util.NeverStop)
if klet.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(klet.syncNodeStatus, klet.nodeStatusUpdateFrequency, util.NeverStop)
}
// Wait for the runtime to be up with a timeout.
if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil {
return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err)
}
klet.lastTimestampRuntimeUp = time.Now()
klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder)
metrics.Register(runtimeCache)
if err = klet.setupDataDirs(); err != nil {
return nil, err
}
if err = klet.volumePluginMgr.InitPlugins(volumePlugins, &volumeHost{klet}); err != nil {
return nil, err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
return klet, nil
}
type serviceLister interface {
List() (api.ServiceList, error)
}
type nodeLister interface {
List() (machines api.NodeList, err error)
GetNodeInfo(id string) (*api.Node, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
nodeName string
dockerClient dockertools.DockerInterface
runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface
rootDirectory string
podWorkers PodWorkers
resyncInterval time.Duration
sourcesReady SourcesReadyFn
podManager podManager
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
containerRefManager *kubecontainer.RefManager
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
// Optional, defaults to simple Docker implementation
runner kubecontainer.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
httpClient kubeletTypes.HttpGetter
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
// Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
standaloneMode bool
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS net.IP
masterServiceNamespace string
serviceLister serviceLister
nodeLister nodeLister
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
runtimeMutex sync.Mutex
runtimeUpThreshold time.Duration
lastTimestampRuntimeUp time.Time
// Network Status information
networkConfigMutex sync.Mutex
networkConfigured bool
// Volume plugins.
volumePluginMgr volume.VolumePluginMgr
// Network plugin.
networkPlugin network.NetworkPlugin
// Container readiness state manager.
readinessManager *kubecontainer.ReadinessManager
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time.Duration
// The EventRecorder to use
recorder record.EventRecorder
// Policy for handling garbage collection of dead containers.
containerGC containerGC
// Manager for images.
imageManager imageManager
// Diskspace manager.
diskSpaceManager diskSpaceManager
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorApi.MachineInfo
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager *statusManager
// Manager for the volume maps for the pods.
volumeManager *volumeManager
//Cloud provider interface
cloud cloudprovider.Interface
// Reference to this node.
nodeRef *api.ObjectReference
// Container runtime.
containerRuntime kubecontainer.Runtime
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
os kubecontainer.OSInterface
// Watcher of out of memory events.
oomWatcher OOMWatcher
// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string
// Mounter to use for volumes.
mounter mount.Interface
// Manager of non-Runtime containers.
containerManager containerManager
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
podCIDR string
// Number of Pods which can be run by this Kubelet
pods int
// Monitor Kubelet's sync loop
syncLoopMonitor util.AtomicValue
}
// getRootDir returns the full path to the directory under which kubelet can
// store data. These functions are useful to pass interfaces to other modules
// that may need to know where to write data without getting a whole kubelet
// instance.
func (kl *Kubelet) getRootDir() string {
return kl.rootDirectory
}
// getPodsDir returns the full path to the directory under which pod
// directories are created.
func (kl *Kubelet) getPodsDir() string {
return path.Join(kl.getRootDir(), "pods")
}
// getPluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getPluginsDir() string {
return path.Join(kl.getRootDir(), "plugins")
}
// getPluginDir returns a data directory name for a given plugin name.
// Plugins can use these directories to store data that they need to persist.
// For per-pod plugin data, see getPodPluginDir.
func (kl *Kubelet) getPluginDir(pluginName string) string {
return path.Join(kl.getPluginsDir(), pluginName)
}
// getPodDir returns the full path to the per-pod data directory for the
// specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) getPodDir(podUID types.UID) string {
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
oldPath := path.Join(kl.getRootDir(), string(podUID))
oldExists := dirExists(oldPath)
newPath := path.Join(kl.getPodsDir(), string(podUID))
newExists := dirExists(newPath)
if oldExists && !newExists {
return oldPath
}
if oldExists {
glog.Warningf("Data dir for pod %q exists in both old and new form, using new", podUID)
}
return newPath
}
// getPodVolumesDir returns the full path to the per-pod data directory under
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumesDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "volumes")
}
// getPodVolumeDir returns the full path to the directory which represents the
// named volume under the named plugin for specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
return path.Join(kl.getPodVolumesDir(podUID), pluginName, volumeName)
}
// getPodPluginsDir returns the full path to the per-pod data directory under
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodPluginsDir(podUID types.UID) string {
return path.Join(kl.getPodDir(podUID), "plugins")
}
// getPodPluginDir returns a data directory name for a given plugin name for a
// given pod UID. Plugins can use these directories to store data that they
// need to persist. For non-per-pod plugin data, see getPluginDir.
func (kl *Kubelet) getPodPluginDir(podUID types.UID, pluginName string) string {
return path.Join(kl.getPodPluginsDir(podUID), pluginName)
}
// getPodContainerDir returns the full path to the per-pod data directory under
// which container data is held for the specified pod. This directory may not
// exist if the pod or container does not exist.
func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
// Backwards compat. The "old" stuff should be removed before 1.0
// release. The thinking here is this:
// !old && !new = use new
// !old && new = use new
// old && !new = use old
// old && new = use new (but warn)
oldPath := path.Join(kl.getPodDir(podUID), ctrName)
oldExists := dirExists(oldPath)
newPath := path.Join(kl.getPodDir(podUID), "containers", ctrName)
newExists := dirExists(newPath)
if oldExists && !newExists {
return oldPath
}
if oldExists {
glog.Warningf("Data dir for pod %q, container %q exists in both old and new form, using new", podUID, ctrName)
}
return newPath
}
func dirExists(path string) bool {
s, err := os.Stat(path)
if err != nil {
return false
}
return s.IsDir()
}
func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = path.Clean(kl.rootDirectory)
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
return fmt.Errorf("error creating root directory: %v", err)
}
if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
return fmt.Errorf("error creating pods directory: %v", err)
}
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins directory: %v", err)
}
return nil
}
// Get a list of pods that have data directories.
func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
podInfos, err := ioutil.ReadDir(kl.getPodsDir())
if err != nil {
return nil, err
}
pods := []types.UID{}
for i := range podInfos {
if podInfos[i].IsDir() {
pods = append(pods, types.UID(podInfos[i].Name()))
}
}
return pods, nil
}
func (kl *Kubelet) GetNode() (*api.Node, error) {
if kl.standaloneMode {
return nil, errors.New("no node entry for kubelet in standalone mode")
}
l, err := kl.nodeLister.List()
if err != nil {
return nil, errors.New("cannot list nodes")
}
nodeName := kl.nodeName
for _, n := range l.Items {
if n.Name == nodeName {
return &n, nil
}
}
return nil, fmt.Errorf("node %v not found", nodeName)
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
go util.Forever(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
}
}, time.Minute)
go util.Forever(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
glog.Errorf("Image garbage collection failed: %v", err)
}
}, 5*time.Minute)
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
// Move Kubelet to a container.
if kl.resourceContainer != "" {
err := util.RunInResourceContainer(kl.resourceContainer)
if err != nil {
glog.Warningf("Failed to move Kubelet to container %q: %v", kl.resourceContainer, err)
}
glog.Infof("Running in container %q", kl.resourceContainer)
}
if err := kl.imageManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ImageManager %v", err)
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
if err := kl.cadvisor.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start CAdvisor %v", err)
glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
}
if err := kl.containerManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ContainerManager %v", err)
glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
}
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start OOM watcher %v", err)
glog.Errorf("Failed to start OOM watching: %v", err)
}
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
// Run the system oom watcher forever.
kl.statusManager.Start()
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.nodeName,
Labels: map[string]string{"kubernetes.io/hostname": kl.hostname},
},
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
} else {
node.Spec.ExternalID = kl.hostname
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithApiserver() {
if kl.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
kl.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := kl.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", kl.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithApiserver()
}
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
func makeMounts(container *api.Container, podVolumes kubecontainer.VolumeMap) (mounts []kubecontainer.Mount) {
for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name]
if !ok {
glog.Warningf("Mount cannot be satisified for container %q, because the volume is missing: %q", container.Name, mount)
continue
}
mounts = append(mounts, kubecontainer.Mount{
Name: mount.Name,
ContainerPath: mount.MountPath,
HostPath: vol.GetPath(),
ReadOnly: mount.ReadOnly,
})
}
return
}
func makePortMappings(container *api.Container) (ports []kubecontainer.PortMapping) {
names := make(map[string]struct{})
for _, p := range container.Ports {
pm := kubecontainer.PortMapping{
HostPort: p.HostPort,
ContainerPort: p.ContainerPort,
Protocol: p.Protocol,
HostIP: p.HostIP,
}
// We need to create some default port name if it's not specified, since
// this is necessary for rkt.
// https://github.com/GoogleCloudPlatform/kubernetes/issues/7710
if p.Name == "" {
pm.Name = fmt.Sprintf("%s-%s:%d", container.Name, p.Protocol, p.ContainerPort)
} else {
pm.Name = fmt.Sprintf("%s-%s", container.Name, p.Name)
}
// Protect against exposing the same protocol-port more than once in a container.
if _, ok := names[pm.Name]; ok {
glog.Warningf("Port name conflicted, %q is defined more than once", pm.Name)
continue
}
ports = append(ports, pm)
names[pm.Name] = struct{}{}
}
return
}
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container) (*kubecontainer.RunContainerOptions, error) {
var err error
opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot}
vol, ok := kl.volumeManager.GetVolumes(pod.UID)
if !ok {
return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", kubecontainer.GetPodFullName(pod))
}
opts.PortMappings = makePortMappings(container)
opts.Mounts = makeMounts(container, vol)
opts.Envs, err = kl.makeEnvironmentVariables(pod, container)
if err != nil {
return nil, err
}
if len(container.TerminationMessagePath) != 0 {
p := kl.getPodContainerDir(pod.UID, container.Name)
if err := os.MkdirAll(p, 0750); err != nil {
glog.Errorf("Error on creating %q: %v", p, err)
} else {
opts.PodContainerDir = p
}
}
if pod.Spec.DNSPolicy == api.DNSClusterFirst {
opts.DNS, opts.DNSSearch, err = kl.getClusterDNS(pod)
if err != nil {
return nil, err
}
}
return opts, nil
}
var masterServices = util.NewStringSet("kubernetes")
// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
var (
serviceMap = make(map[string]api.Service)
m = make(map[string]string)
)
// Get all service resources from the master (via a cache),
// and populate them into service environment variables.
if kl.serviceLister == nil {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m, nil
}
services, err := kl.serviceLister.List()
if err != nil {
return m, fmt.Errorf("failed to list services when setting up env vars.")
}
// project the services in namespace ns onto the master services
for _, service := range services.Items {
// ignore services where ClusterIP is "None" or empty
if !api.IsServiceIPSet(&service) {
continue
}
serviceName := service.Name
switch service.Namespace {
// for the case whether the master service namespace is the namespace the pod
// is in, the pod should receive all the services in the namespace.
//
// ordering of the case clauses below enforces this
case ns:
serviceMap[serviceName] = service
case kl.masterServiceNamespace:
if masterServices.Has(serviceName) {
if _, exists := serviceMap[serviceName]; !exists {
serviceMap[serviceName] = service
}
}
}
}
services.Items = []api.Service{}
for _, service := range serviceMap {
services.Items = append(services.Items, service)
}
for _, e := range envvars.FromServices(&services) {
m[e.Name] = e.Value
}
return m, nil
}
// Make the service environment variables for a pod in the given namespace.
func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Container) ([]kubecontainer.EnvVar, error) {
var result []kubecontainer.EnvVar
// Note: These are added to the docker.Config, but are not included in the checksum computed
// by dockertools.BuildDockerName(...). That way, we can still determine whether an
// api.Container is already running by its hash. (We don't want to restart a container just
// because some service changed.)
//
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
serviceEnv, err := kl.getServiceEnvVarMap(pod.Namespace)
if err != nil {
return result, err
}
// Determine the final values of variables:
//
// 1. Determine the final value of each variable:
// a. If the variable's Value is set, expand the `$(var)` references to other
// variables in the .Value field; the sources of variables are the declared
// variables of the container and the service environment variables
// b. If a source is defined for an environment variable, resolve the source
// 2. Create the container's environment in the order variables are declared
// 3. Add remaining service environment vars
tmpEnv := make(map[string]string)
mappingFunc := expansion.MappingFuncFor(tmpEnv, serviceEnv)
for _, envVar := range container.Env {
// Accesses apiserver+Pods.
// So, the master may set service env vars, or kubelet may. In case both are doing
// it, we delete the key from the kubelet-generated ones so we don't have duplicate
// env vars.
// TODO: remove this net line once all platforms use apiserver+Pods.
delete(serviceEnv, envVar.Name)
runtimeVal := envVar.Value
if runtimeVal != "" {
// Step 1a: expand variable references
runtimeVal = expansion.Expand(runtimeVal, mappingFunc)
} else if envVar.ValueFrom != nil && envVar.ValueFrom.FieldRef != nil {
// Step 1b: resolve alternate env var sources
runtimeVal, err = kl.podFieldSelectorRuntimeValue(envVar.ValueFrom.FieldRef, pod)
if err != nil {
return result, err
}
}
tmpEnv[envVar.Name] = runtimeVal
result = append(result, kubecontainer.EnvVar{Name: envVar.Name, Value: tmpEnv[envVar.Name]})
}
// Append remaining service env vars.
for k, v := range serviceEnv {
result = append(result, kubecontainer.EnvVar{Name: k, Value: v})
}
return result, nil
}
func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *api.ObjectFieldSelector, pod *api.Pod) (string, error) {
internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "")
if err != nil {
return "", err
}
return fieldpath.ExtractFieldPathAsString(pod, internalFieldPath)
}
// getClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) getClusterDNS(pod *api.Pod) ([]string, []string, error) {
// Get host DNS settings and append them to cluster DNS settings.
f, err := os.Open("/etc/resolv.conf")
if err != nil {
return nil, nil, err
}
defer f.Close()
hostDNS, hostSearch, err := parseResolvConf(f)
if err != nil {
return nil, nil, err
}
var dns, dnsSearch []string
if kl.clusterDNS != nil {
dns = append([]string{kl.clusterDNS.String()}, hostDNS...)
}
if kl.clusterDomain != "" {
nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
}
return dns, dnsSearch, nil
}
// Returns the list of DNS servers and DNS search domains.
func parseResolvConf(reader io.Reader) (nameservers []string, searches []string, err error) {
file, err := ioutil.ReadAll(reader)
if err != nil {
return nil, nil, err
}
// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = []string{}
// Lines of the form "search example.com" overrule - last one wins.
searches = []string{}
lines := strings.Split(string(file), "\n")
for l := range lines {
trimmed := strings.TrimSpace(lines[l])
if strings.HasPrefix(trimmed, "#") {
continue
}
fields := strings.Fields(trimmed)
if len(fields) == 0 {
continue
}
if fields[0] == "nameserver" {
nameservers = append(nameservers, fields[1:]...)
}
if fields[0] == "search" {
searches = fields[1:]
}
}
return nameservers, searches, nil
}
// Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
return kl.containerRuntime.KillPod(pod)
}
type empty struct{}
// makePodDataDirs creates the dirs for the pod datas.
func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
uid := pod.UID
if err := os.Mkdir(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
return nil
}
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType SyncPodType) error {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
start := time.Now()
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[ConfigFirstSeenAnnotationKey]; !ok {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
} else {
firstSeenTime = kubeletTypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
// Before returning, regenerate status and store it in the cache.
defer func() {
if isStaticPod(pod) && mirrorPod == nil {
// No need to cache the status because the mirror pod does not
// exist yet.
return
}
status, err := kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
podToUpdate := pod
if mirrorPod != nil {
podToUpdate = mirrorPod
}
existingStatus, ok := kl.statusManager.GetPodStatus(podFullName)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(podToUpdate, status)
}
}()
// Kill pods we can't run.
err := canRunPod(pod)
if err != nil {
kl.killPod(runningPod)
return err
}
if err := kl.makePodDataDirs(pod); err != nil {
glog.Errorf("Unable to make pod data directories for pod %q (uid %q): %v", podFullName, uid, err)
return err
}
// Starting phase:
ref, err := api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}
// Mount volumes.
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failedMount", "Unable to mount volumes for pod %q: %v", podFullName, err)
}
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
return err
}
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
// The kubelet is the source of truth for pod status. It ignores the status sent from
// the apiserver and regenerates status for every pod update, incrementally updating
// the status it received at pod creation time.
//
// The container runtime needs 2 pieces of information from the status to sync a pod:
// The terminated state of containers (to restart them) and the podIp (for liveness probes).
// New pods don't have either, so we skip the expensive status generation step.
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create is processed by SyncPods, which cancels this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
//
// Also note that deletes currently have an updateType of `create` set in UpdatePods.
// This, again, does not matter because deletes are not processed by this method.
var podStatus api.PodStatus
if updateType == SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if !firstSeenTime.IsZero() {
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
podStatus = pod.Status
podStatus.StartTime = &util.Time{start}
kl.statusManager.SetPodStatus(pod, podStatus)
glog.V(3).Infof("Not generating pod status for new pod %q", podFullName)
} else {
var err error
podStatus, err = kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to get status for pod %q (uid %q): %v", podFullName, uid, err)
return err
}
}
pullSecrets, err := kl.getPullSecretsForPod(pod)
if err != nil {
glog.Errorf("Unable to get pull secrets for pod %q (uid %q): %v", podFullName, uid, err)
return err
}
err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets)
if err != nil {
return err
}
if isStaticPod(pod) {
if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
glog.Errorf("Deleting mirror pod %q because it is outdated", podFullName)
if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
glog.Errorf("Failed deleting mirror pod %q: %v", podFullName, err)
}
}
if mirrorPod == nil {
glog.V(3).Infof("Creating a mirror pod %q", podFullName)
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %v", podFullName, err)
}
// Pod status update is edge-triggered. If there is any update of the
// mirror pod, we need to delete the existing status associated with
// the static pod to trigger an update.
kl.statusManager.DeletePodStatus(podFullName)
}
}
return nil
}
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull secrets
// TODO transitively search through the referenced service account to find the required secrets
// TODO duplicate secrets are being retrieved multiple times and there is no cache. Creating and using a secret manager interface will make this easier to address.
func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
pullSecrets := []api.Secret{}
for _, secretRef := range pod.Spec.ImagePullSecrets {
secret, err := kl.kubeClient.Secrets(pod.Namespace).Get(secretRef.Name)
if err != nil {
return nil, err
}
pullSecrets = append(pullSecrets, *secret)
}
return pullSecrets, nil
}
// Stores all volumes defined by the set of pods into a map.
// Keys for each entry are in the format (POD_ID)/(VOLUME_NAME)
func getDesiredVolumes(pods []*api.Pod) map[string]api.Volume {
desiredVolumes := make(map[string]api.Volume)
for _, pod := range pods {
for _, volume := range pod.Spec.Volumes {
identifier := path.Join(string(pod.UID), volume.Name)
desiredVolumes[identifier] = volume
}
}
return desiredVolumes
}
func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error {
desired := util.NewStringSet()
for _, pod := range pods {
desired.Insert(string(pod.UID))
}
found, err := kl.listPodsFromDisk()
if err != nil {
return err
}
errlist := []error{}
for i := range found {
if !desired.Has(string(found[i])) {
glog.V(3).Infof("Orphaned pod %q found, removing", found[i])
if err := os.RemoveAll(kl.getPodDir(found[i])); err != nil {
errlist = append(errlist, err)
}
}
}
return utilErrors.NewAggregate(errlist)
}
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
desiredVolumes := getDesiredVolumes(pods)
currentVolumes := kl.getPodVolumesFromDisk()
runningSet := util.StringSet{}
for _, pod := range runningPods {
runningSet.Insert(string(pod.ID))
}
for name, vol := range currentVolumes {
if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/")
if runningSet.Has(parts[0]) {
glog.Infof("volume %q, still has a container running %q, skipping teardown", name, parts[0])
continue
}
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
//to be deleted and volumes that are leftover after a crash.
glog.Warningf("Orphaned volume %q found, tearing down volume", name)
// TODO(yifan): Refactor this hacky string manipulation.
kl.volumeManager.DeleteVolumes(types.UID(parts[0]))
//TODO (jonesdl) This should not block other kubelet synchronization procedures
err := vol.TearDown()
if err != nil {
glog.Errorf("Could not tear down volume %q: %v", name, err)
}
}
}
return nil
}
// pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
now := util.Now()
if pod.Spec.ActiveDeadlineSeconds != nil {
podStatus, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
if !ok {
podStatus = pod.Status
}
if !podStatus.StartTime.IsZero() {
startTime := podStatus.StartTime.Time
duration := now.Time.Sub(startTime)
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
if duration >= allowedDuration {
return true
}
}
}
return false
}
//podIsTerminated returns true if status is in one of the terminated state.
func podIsTerminated(status *api.PodStatus) bool {
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
return true
}
return false
}
// Filter out pods in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) filterOutTerminatedPods(allPods []*api.Pod) []*api.Pod {
var pods []*api.Pod
for _, pod := range allPods {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(kubecontainer.GetPodFullName(pod))
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod.Status
}
if podIsTerminated(&status) {
continue
}
pods = append(pods, pod)
}
return pods
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType,
mirrorPods map[string]*api.Pod, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
// Remove obsolete entries in podStatus where the pod is no longer considered bound to this node.
podFullNames := make(map[string]bool)
for _, pod := range allPods {
podFullNames[kubecontainer.GetPodFullName(pod)] = true
}
kl.statusManager.RemoveOrphanedStatuses(podFullNames)
// Handles pod admission.
pods := kl.admitPods(allPods, podSyncTypes)
glog.V(4).Infof("Desired pods: %s", kubeletUtil.FormatPodNames(pods))
var err error
desiredPods := make(map[types.UID]empty)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Check for any containers that need starting
for _, pod := range pods {
podFullName := kubecontainer.GetPodFullName(pod)
uid := pod.UID
desiredPods[uid] = empty{}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, mirrorPods[podFullName], func() {
metrics.PodWorkerLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
// Note the number of containers for new pods.
if val, ok := podSyncTypes[pod.UID]; ok && (val == SyncPodCreate) {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// Stop the workers for no-longer existing pods.
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
if !kl.sourcesReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
glog.V(4).Infof("Skipping deletes, sources aren't ready yet.")
return nil
}
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {
glog.Errorf("Failed killing unwanted containers: %v", err)
}
// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cache to get the latest set of
// running pods to clean up the volumes.
// TODO: Evaluate the performance impact of bypassing the runtime cache.
runningPods, err = kl.containerRuntime.GetPods(false)
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Remove any orphaned volumes.
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove volumes associated with terminated but not yet
// deleted pods.
err = kl.cleanupOrphanedVolumes(allPods, runningPods)
if err != nil {
glog.Errorf("Failed cleaning up orphaned volumes: %v", err)
return err
}
// Remove any orphaned pod directories.
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove directories associated with terminated but not yet
// deleted pods.
err = kl.cleanupOrphanedPodDirs(allPods)
if err != nil {
glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
return err
}
// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods()
return err
}
// killUnwantedPods kills the unwanted, running pods in parallel.
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
runningPods []*kubecontainer.Pod) error {
ch := make(chan error, len(runningPods))
defer close(ch)
numWorkers := 0
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; found {
// Per-pod workers will handle the desired pods.
continue
}
numWorkers++
go func(pod *kubecontainer.Pod, ch chan error) {
var err error = nil
defer func() {
ch <- err
}()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers.
err = kl.killPod(*pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
return
}
}(pod, ch)
}
// Aggregate errors from the pod killing workers.
var errs []error
for i := 0; i < numWorkers; i++ {
err := <-ch
if err != nil {
errs = append(errs, err)
}
}
return utilErrors.NewAggregate(errs)
}
type podsByCreationTime []*api.Pod
func (s podsByCreationTime) Len() int {
return len(s)
}
func (s podsByCreationTime) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s podsByCreationTime) Less(i, j int) bool {
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
}
// checkHostPortConflicts detects pods with conflicted host ports.
func checkHostPortConflicts(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
ports := util.StringSet{}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
for _, pod := range pods {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports); len(errs) != 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", kubecontainer.GetPodFullName(pod), errs)
notFitting = append(notFitting, pod)
continue
}
fitting = append(fitting, pod)
}
return
}
// checkCapacityExceeded detects pods that exceeds node's resources.
func (kl *Kubelet) checkCapacityExceeded(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
return pods, nil
}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
capacity := CapacityFromMachineInfo(info)
return predicates.CheckPodsExceedingCapacity(pods, capacity)
}
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) handleOutOfDisk(pods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
if len(podSyncTypes) == 0 {
// regular sync. no new pods
return pods
}
outOfDockerDisk := false
outOfRootDisk := false
// Check disk space once globally and reject or accept all new pods.
withinBounds, err := kl.diskSpaceManager.IsDockerDiskSpaceAvailable()
// Assume enough space in case of errors.
if err == nil && !withinBounds {
outOfDockerDisk = true
}
withinBounds, err = kl.diskSpaceManager.IsRootDiskSpaceAvailable()
// Assume enough space in case of errors.
if err == nil && !withinBounds {
outOfRootDisk = true
}
// Kubelet would indicate all pods as newly created on the first run after restart.
// We ignore the first disk check to ensure that running pods are not killed.
// Disk manager will only declare out of disk problems if unfreeze has been called.
kl.diskSpaceManager.Unfreeze()
if !outOfDockerDisk && !outOfRootDisk {
// Disk space is fine.
return pods
}
var fitting []*api.Pod
for i := range pods {
pod := pods[i]
// Only reject pods that didn't start yet.
if podSyncTypes[pod.UID] == SyncPodCreate {
reason := "OutOfDisk"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to lack of disk space.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to lack of disk space."})
continue
}
fitting = append(fitting, pod)
}
return fitting
}
// checkNodeSelectorMatching detects pods that do not match node's labels.
func (kl *Kubelet) checkNodeSelectorMatching(pods []*api.Pod) (fitting []*api.Pod, notFitting []*api.Pod) {
if kl.standaloneMode {
return pods, notFitting
}
node, err := kl.GetNode()
if err != nil {
glog.Errorf("error getting node: %v", err)
return pods, nil
}
for _, pod := range pods {
if !predicates.PodMatchesNodeLabels(pod, node) {
notFitting = append(notFitting, pod)
continue
}
fitting = append(fitting, pod)
}
return
}
// handleNotfittingPods handles pods that do not fit on the node and returns
// the pods that fit. It currently checks host port conflicts, node selector
// mismatches, and exceeded node capacity.
func (kl *Kubelet) handleNotFittingPods(pods []*api.Pod) []*api.Pod {
fitting, notFitting := checkHostPortConflicts(pods)
for _, pod := range notFitting {
reason := "HostPortConflict"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to host port conflict.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to host port conflict"})
}
fitting, notFitting = kl.checkNodeSelectorMatching(fitting)
for _, pod := range notFitting {
reason := "NodeSelectorMismatching"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to node selector mismatch.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to node selector mismatch"})
}
fitting, notFitting = kl.checkCapacityExceeded(fitting)
for _, pod := range notFitting {
reason := "CapacityExceeded"
kl.recorder.Eventf(pod, reason, "Cannot start the pod due to exceeded capacity.")
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod cannot be started due to exceeded capacity"})
}
return fitting
}
// admitPods handles pod admission. It filters out terminated pods, and pods
// that don't fit on the node, and may reject pods if node is overcommitted.
func (kl *Kubelet) admitPods(allPods []*api.Pod, podSyncTypes map[types.UID]SyncPodType) []*api.Pod {
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave irregardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
pods := kl.filterOutTerminatedPods(allPods)
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
// Reject pods that we cannot run.
// handleNotFittingPods relies on static information (e.g. immutable fields
// in the pod specs or machine information that doesn't change without
// rebooting), and the pods are sorted by immutable creation time. Hence it
// should only rejects new pods without checking the pod sync types.
fitting := kl.handleNotFittingPods(pods)
// Reject new creation requests if diskspace is running low.
admittedPods := kl.handleOutOfDisk(fitting, podSyncTypes)
return admittedPods
}
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
for {
kl.syncLoopIteration(updates, handler)
}
}
func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) {
kl.syncLoopMonitor.Store(time.Now())
if !kl.containerRuntimeUp() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
return
}
if !kl.doneNetworkConfigure() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
return
}
unsyncedPod := false
podSyncTypes := make(map[types.UID]SyncPodType)
select {
case u, ok := <-updates:
if !ok {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return
}
kl.podManager.UpdatePods(u, podSyncTypes)
unsyncedPod = true
kl.syncLoopMonitor.Store(time.Now())
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.podManager.UpdatePods(u, podSyncTypes)
kl.syncLoopMonitor.Store(time.Now())
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
}
}
pods, mirrorPods := kl.podManager.GetPodsAndMirrorMap()
kl.syncLoopMonitor.Store(time.Now())
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
kl.syncLoopMonitor.Store(time.Now())
}
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
val := kl.syncLoopMonitor.Load()
if val == nil {
return time.Time{}
}
return val.(time.Time)
}
// Returns the container runtime version for this Kubelet.
func (kl *Kubelet) GetContainerRuntimeVersion() (kubecontainer.Version, error) {
if kl.containerRuntime == nil {
return nil, fmt.Errorf("no container runtime")
}
return kl.containerRuntime.Version()
}
func (kl *Kubelet) validatePodPhase(podStatus *api.PodStatus) error {
switch podStatus.Phase {
case api.PodRunning, api.PodSucceeded, api.PodFailed:
return nil
}
return fmt.Errorf("pod is not in 'Running', 'Succeeded' or 'Failed' state - State: %q", podStatus.Phase)
}
func (kl *Kubelet) validateContainerStatus(podStatus *api.PodStatus, containerName string, previous bool) (containerID string, err error) {
var cID string
cStatus, found := api.GetContainerStatus(podStatus.ContainerStatuses, containerName)
if !found {
return "", fmt.Errorf("container %q not found in pod", containerName)
}
if previous {
if cStatus.LastTerminationState.Terminated == nil {
return "", fmt.Errorf("previous terminated container %q not found in pod", containerName)
}
cID = cStatus.LastTerminationState.Terminated.ContainerID
} else {
if cStatus.State.Waiting != nil {
return "", fmt.Errorf("container %q is in waiting state.", containerName)
}
cID = cStatus.ContainerID
}
return kubecontainer.TrimRuntimePrefix(cID), nil
}
// GetKubeletContainerLogs returns logs from the container
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow, previous bool, stdout, stderr io.Writer) error {
// TODO(vmarmol): Refactor to not need the pod status and verification.
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
podStatus, found := kl.statusManager.GetPodStatus(podFullName)
if !found {
return fmt.Errorf("failed to get status for pod %q", podFullName)
}
if err := kl.validatePodPhase(&podStatus); err != nil {
// No log is available if pod is not in a "known" phase (e.g. Unknown).
return err
}
containerID, err := kl.validateContainerStatus(&podStatus, containerName, previous)
if err != nil {
// No log is available if the container status is missing or is in the
// waiting state.
return err
}
pod, ok := kl.GetPodByFullName(podFullName)
if !ok {
return fmt.Errorf("unable to get logs for container %q in pod %q: unable to find pod", containerName, podFullName)
}
return kl.containerRuntime.GetContainerLogs(pod, containerID, tail, follow, stdout, stderr)
}
// GetHostname Returns the hostname as the kubelet sees it.
func (kl *Kubelet) GetHostname() string {
return kl.hostname
}
// Returns host IP or nil in case of error.
func (kl *Kubelet) GetHostIP() (net.IP, error) {
node, err := kl.GetNode()
if err != nil {
return nil, fmt.Errorf("cannot get node: %v", err)
}
return nodeutil.GetNodeHostIP(node)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pods.
func (kl *Kubelet) GetPods() []*api.Pod {
return kl.podManager.GetPods()
}
// GetRunningPods returns all pods running on kubelet from looking at the
// container runtime cache. This function converts kubecontainer.Pod to
// api.Pod, so only the fields that exist in both kubecontainer.Pod and
// api.Pod are considered meaningful.
func (kl *Kubelet) GetRunningPods() ([]*api.Pod, error) {
pods, err := kl.runtimeCache.GetPods()
if err != nil {
return nil, err
}
apiPods := make([]*api.Pod, 0, len(pods))
for _, pod := range pods {
apiPods = append(apiPods, pod.ToAPIPod())
}
return apiPods, nil
}
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
return kl.podManager.GetPodByFullName(podFullName)
}
// GetPodByName provides the first pod that matches namespace and name, as well
// as whether the pod was found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.Pod, bool) {
return kl.podManager.GetPodByName(namespace, name)
}
func (kl *Kubelet) updateRuntimeUp() {
start := time.Now()
err := waitUntilRuntimeIsUp(kl.containerRuntime, 100*time.Millisecond)
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
if err == nil {
kl.lastTimestampRuntimeUp = time.Now()
} else {
glog.Errorf("Container runtime sanity check failed after %v, err: %v", time.Since(start), err)
}
}
func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
if podCIDR == "" {
glog.V(5).Info("PodCIDR not set. Will not configure cbr0.")
return nil
}
glog.V(5).Infof("PodCIDR is set to %q", podCIDR)
_, cidr, err := net.ParseCIDR(podCIDR)
if err != nil {
return err
}
// Set cbr0 interface address to first address in IPNet
cidr.IP.To4()[3] += 1
if err := ensureCbr0(cidr); err != nil {
return err
}
return nil
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
func (kl *Kubelet) recordNodeStatusEvent(event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event)
}
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
func (kl *Kubelet) syncNetworkStatus() {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
networkConfigured := true
if kl.configureCBR0 {
if err := ensureIPTablesMasqRule(); err != nil {
networkConfigured = false
glog.Errorf("Error on adding ip table rules: %v", err)
}
if len(kl.podCIDR) == 0 {
glog.Warningf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
networkConfigured = false
} else if err := kl.reconcileCBR0(kl.podCIDR); err != nil {
networkConfigured = false
glog.Errorf("Error configuring cbr0: %v", err)
}
}
kl.networkConfigured = networkConfigured
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(kl.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addr.String()}}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}}
}
}
}
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(kl.pods), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = CapacityFromMachineInfo(info)
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.pods), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, "rebooted",
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
// TODO: Determine the runtime is docker or rocket
node.Status.NodeInfo.ContainerRuntimeVersion = "docker://" + verinfo.DockerVersion
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
// Check whether container runtime can be reported as up.
containerRuntimeUp := kl.containerRuntimeUp()
// Check whether network is configured properly
networkConfigured := kl.doneNetworkConfigure()
currentTime := util.Now()
var newNodeReadyCondition api.NodeCondition
var oldNodeReadyConditionStatus api.ConditionStatus
if containerRuntimeUp && networkConfigured {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
var reasons []string
if !containerRuntimeUp {
reasons = append(reasons, "container runtime is down")
}
if !networkConfigured {
reasons = append(reasons, "network not configured correctly")
}
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: strings.Join(reasons, ","),
LastHeartbeatTime: currentTime,
}
}
updated := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
oldNodeReadyConditionStatus = node.Status.Conditions[i].Status
if oldNodeReadyConditionStatus == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
}
node.Status.Conditions[i] = newNodeReadyCondition
updated = true
}
}
if !updated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if !updated || oldNodeReadyConditionStatus != newNodeReadyCondition.Status {
if newNodeReadyCondition.Status == api.ConditionTrue {
kl.recordNodeStatusEvent("NodeReady")
} else {
kl.recordNodeStatusEvent("NodeNotReady")
}
}
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
kl.recordNodeStatusEvent("NodeNotSchedulable")
} else {
kl.recordNodeStatusEvent("NodeSchedulable")
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
func (kl *Kubelet) containerRuntimeUp() bool {
kl.runtimeMutex.Lock()
defer kl.runtimeMutex.Unlock()
return kl.lastTimestampRuntimeUp.Add(kl.runtimeUpThreshold).After(time.Now())
}
func (kl *Kubelet) doneNetworkConfigure() bool {
kl.networkConfigMutex.Lock()
defer kl.networkConfigMutex.Unlock()
return kl.networkConfigured
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
kl.networkConfigMutex.Lock()
kl.podCIDR = node.Spec.PodCIDR
kl.networkConfigMutex.Unlock()
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Nodes().UpdateStatus(node)
return err
}
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.
func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
running := 0
waiting := 0
stopped := 0
failed := 0
succeeded := 0
unknown := 0
for _, container := range spec.Containers {
if containerStatus, ok := api.GetContainerStatus(info, container.Name); ok {
if containerStatus.State.Running != nil {
running++
} else if containerStatus.State.Terminated != nil {
stopped++
if containerStatus.State.Terminated.ExitCode == 0 {
succeeded++
} else {
failed++
}
} else if containerStatus.State.Waiting != nil {
waiting++
} else {
unknown++
}
} else {
unknown++
}
}
switch {
case waiting > 0:
glog.V(5).Infof("pod waiting > 0, pending")
// One or more containers has not been started
return api.PodPending
case running > 0 && unknown == 0:
// All containers have been started, and at least
// one container is running
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
// All containers are terminated
if spec.RestartPolicy == api.RestartPolicyAlways {
// All containers are in the process of restarting
return api.PodRunning
}
if stopped == succeeded {
// RestartPolicy is not Always, and all
// containers are terminated in success
return api.PodSucceeded
}
if spec.RestartPolicy == api.RestartPolicyNever {
// RestartPolicy is Never, and all containers are
// terminated with at least one in failure
return api.PodFailed
}
// RestartPolicy is OnFailure, and at least one in failure
// and in the process of restarting
return api.PodRunning
default:
glog.V(5).Infof("pod default case, pending")
return api.PodPending
}
}
// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition.
func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []api.PodCondition {
ready := []api.PodCondition{{
Type: api.PodReady,
Status: api.ConditionTrue,
}}
unready := []api.PodCondition{{
Type: api.PodReady,
Status: api.ConditionFalse,
}}
if statuses == nil {
return unready
}
for _, container := range spec.Containers {
if containerStatus, ok := api.GetContainerStatus(statuses, container.Name); ok {
if !containerStatus.Ready {
return unready
}
} else {
return unready
}
}
return ready
}
// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
start := time.Now()
defer func() {
metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start))
}()
podFullName := kubecontainer.GetPodFullName(pod)
glog.V(3).Infof("Generating status for %q", podFullName)
// TODO: Consider include the container information.
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
kl.recorder.Eventf(pod, reason, "Pod was active on the node longer than specified deadline")
return api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod was active on the node longer than specified deadline"}, nil
}
spec := &pod.Spec
podStatus, err := kl.containerRuntime.GetPodStatus(pod)
if err != nil {
// Error handling
glog.Infof("Query container info for pod %q failed with error (%v)", podFullName, err)
if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do
return api.PodStatus{}, err
}
pendingStatus := api.PodStatus{
Phase: api.PodPending,
Reason: "GeneralError",
Message: fmt.Sprintf("Query container info failed with error (%v)", err),
}
return pendingStatus, nil
}
// Assume info is ready to process
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
for _, c := range spec.Containers {
for i, st := range podStatus.ContainerStatuses {
if st.Name == c.Name {
ready := st.State.Running != nil && kl.readinessManager.GetReadiness(kubecontainer.TrimRuntimePrefix(st.ContainerID))
podStatus.ContainerStatuses[i].Ready = ready
break
}
}
}
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...)
if !kl.standaloneMode {
hostIP, err := kl.GetHostIP()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {
podStatus.HostIP = hostIP.String()
if pod.Spec.HostNetwork && podStatus.PodIP == "" {
podStatus.PodIP = hostIP.String()
}
}
}
return *podStatus, nil
}
// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.logServer.ServeHTTP(w, req)
}
// findContainer finds and returns the container with the given pod ID, full name, and container name.
// It returns nil if not found.
func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) {
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return nil, err
}
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
return pod.FindContainerByName(containerName), nil
}
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containerName string, cmd []string) ([]byte, error) {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.RunInContainer(string(container.ID), cmd)
}
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.ExecInContainer(string(container.ID), cmd, stdin, stdout, stderr, tty)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
podUID = kl.podManager.TranslatePodUID(podUID)
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
}
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
}
return kl.runner.PortForward(&pod, port, stream)
}
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
kl.recorder.Eventf(kl.nodeRef, "starting", "Starting kubelet.")
}
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {
podUID = kl.podManager.TranslatePodUID(podUID)
pods, err := kl.runtimeCache.GetPods()
if err != nil {
return nil, err
}
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
container := pod.FindContainerByName(containerName)
if container == nil {
return nil, ErrContainerNotFound
}
ci, err := kl.cadvisor.DockerContainer(string(container.ID), req)
if err != nil {
return nil, err
}
return &ci, nil
}
// Returns stats (from Cadvisor) for a non-Kubernetes container.
func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error) {
if subcontainers {
return kl.cadvisor.SubcontainerInfo(containerName, req)
} else {
containerInfo, err := kl.cadvisor.ContainerInfo(containerName, req)
if err != nil {
return nil, err
}
return map[string]*cadvisorApi.ContainerInfo{
containerInfo.Name: containerInfo,
}, nil
}
}
// GetCachedMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
if kl.machineInfo == nil {
info, err := kl.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
kl.machineInfo = info
}
return kl.machineInfo, nil
}
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *TLSOptions, enableDebuggingHandlers bool) {
ListenAndServeKubeletServer(kl, address, port, tlsOptions, enableDebuggingHandlers)
}
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
ListenAndServeKubeletReadOnlyServer(kl, address, port)
}
// GetRuntime returns the current Runtime implementation in use by the kubelet. This func
// is exported to simplify integration with third party kubelet extensions (e.g. kubernetes-mesos).
func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.0.4

搜索帮助