1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
kubelet.go 114.76 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fieldpath"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/clock"
utilconfig "k8s.io/kubernetes/pkg/util/config"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/integer"
kubeio "k8s.io/kubernetes/pkg/util/io"
utilipt "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
utilvalidation "k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/third_party/forked/golang/expansion"
)
const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time.Minute
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// Location of container logs.
containerLogsDir = "/var/log/containers"
// max backoff period, exported for the e2e test
MaxContainerBackOff = 300 * time.Second
// Capacity of the channel for storing pods to kill. A small number should
// suffice because a goroutine is dedicated to check the channel and does
// not block on anything else.
podKillingChannelCapacity = 50
// Period for performing global cleanup tasks.
housekeepingPeriod = time.Second * 2
// Period for performing eviction monitoring.
// TODO ensure this is in sync with internal cadvisor housekeeping.
evictionMonitoringPeriod = time.Second * 10
// The path in containers' filesystems where the hosts file is mounted.
etcHostsPath = "/etc/hosts"
// Capacity of the channel for receiving pod lifecycle events. This number
// is a bit arbitrary and may be adjusted in the future.
plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events.
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
// backOffPeriod is the period to back off when pod syncing results in an
// error. It is also used as the base period for the exponential backoff
// container restarts and image pulls.
backOffPeriod = time.Second * 10
// Period for performing container garbage collection.
ContainerGCPeriod = time.Minute
// Period for performing image garbage collection.
ImageGCPeriod = 5 * time.Minute
// Minimum number of dead containers to keep in a pod
minDeadContainerInPod = 1
)
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*api.Pod)
HandlePodUpdates(pods []*api.Pod)
HandlePodRemoves(pods []*api.Pod)
HandlePodReconcile(pods []*api.Pod)
HandlePodSyncs(pods []*api.Pod)
HandlePodCleanups() error
}
// Option is a functional option type for Kubelet
type Option func(*Kubelet)
// bootstrapping interface for kubelet, targets the initialization protocol
type KubeletBootstrap interface {
GetConfiguration() componentconfig.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
ListenAndServeReadOnly(address net.IP, port uint)
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
// create and initialize a Kubelet instance
type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error)
// KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
// these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
type KubeletDeps struct {
// TODO(mtaufen): KubeletBuilder:
// Mesos currently uses this as a hook to let them make their own call to
// let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
// their own KubeletBootstrap. It's a useful hook. I need to think about what
// a nice home for it would be. There seems to be a trend, between this and
// the Options fields below, of providing hooks where you can add extra functionality
// to the Kubelet for your solution. Maybe we should centralize these sorts of things?
Builder KubeletBuilder
// TODO(mtaufen): ContainerRuntimeOptions and Options:
// Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
// seem like a difficult path to trace when it's time to debug something.
// I'm leaving these fields here for now, but there is likely an easier-to-follow
// way to support their intended use cases. E.g. ContainerRuntimeOptions
// is used by Mesos to set an environment variable in containers which has
// some connection to their container GC. It seems that Mesos intends to use
// Options to add additional node conditions that are updated as part of the
// Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
// We should think about providing more explicit ways of doing these things.
ContainerRuntimeOptions []kubecontainer.Option
Options []Option
// Injected Dependencies
Auth server.AuthInterface
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
DockerClient dockertools.DockerInterface
EventClient *clientset.Clientset
KubeClient *clientset.Clientset
Mounter mount.Interface
NetworkPlugins []network.NetworkPlugin
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
Recorder record.EventRecorder
Writer kubeio.Writer
VolumePlugins []volume.VolumePlugin
TLSOptions *server.TLSOptions
}
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName string) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if kubeCfg.ManifestURLHeader != "" {
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
if kubeCfg.RootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
}
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}
if kubeCfg.MakeIPTablesUtilChains {
if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
}
}
hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
// Query the cloud provider for our node name, default to hostname
nodeName := hostname
if kubeDeps.Cloud != nil {
var err error
instances, ok := kubeDeps.Cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
nodeName, err = instances.CurrentNodeName(hostname)
if err != nil {
return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
}
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
}
// TODO: KubeletDeps.KubeClient should be a client interface, but client interface misses certain methods
// used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
// a nil pointer to it when what we really want is a nil interface.
var kubeClient clientset.Interface
if kubeDeps.KubeClient != nil {
kubeClient = kubeDeps.KubeClient
// TODO: remove this when we've refactored kubelet to only use clientset.
}
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kubeCfg.MinimumGCAge.Duration,
MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
MaxContainers: int(kubeCfg.MaxContainerCount),
}
daemonEndpoints := &api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
diskSpacePolicy := DiskSpacePolicy{
DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
}
thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
}
reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
if err != nil {
return nil, err
}
var dockerExecHandler dockertools.ExecHandler
switch kubeCfg.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
listWatch := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().Services(api.NamespaceAll).Watch(options)
},
}
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
}
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{api.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return kubeClient.Core().Nodes().Watch(options)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
nodeRef := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
containerRefManager := kubecontainer.NewRefManager()
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
// TODO(mtaufen): remove when internal cbr0 implementation gets removed in favor
// of the kubenet network plugin
var myConfigureCBR0 bool = kubeCfg.ConfigureCBR0
var myFlannelExperimentalOverlay bool = kubeCfg.ExperimentalFlannelOverlay
if kubeCfg.NetworkPluginName == "kubenet" {
myConfigureCBR0 = false
myFlannelExperimentalOverlay = false
}
klet := &Kubelet{
hostname: hostname,
nodeName: nodeName,
dockerClient: kubeDeps.DockerClient,
kubeClient: kubeClient,
rootDirectory: kubeCfg.RootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
containerRefManager: containerRefManager,
httpClient: &http.Client{},
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: kubeCfg.RegisterNode,
registerSchedulable: kubeCfg.RegisterSchedulable,
standaloneMode: standaloneMode,
clusterDomain: kubeCfg.ClusterDomain,
clusterDNS: net.ParseIP(kubeCfg.ClusterDNS),
serviceLister: serviceLister,
nodeLister: nodeLister,
nodeInfo: nodeInfo,
masterServiceNamespace: kubeCfg.MasterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
diskSpaceManager: diskSpaceManager,
cloud: kubeDeps.Cloud,
autoDetectCloudProvider: (kubeExternal.AutoDetectCloudProvider == kubeCfg.CloudProvider),
nodeRef: nodeRef,
nodeLabels: kubeCfg.NodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
configureCBR0: myConfigureCBR0,
nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
reconcileCIDR: kubeCfg.ReconcileCIDR,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
nvidiaGPUs: int(kubeCfg.NvidiaGPUs),
syncLoopMonitor: atomic.Value{},
resolverConfig: kubeCfg.ResolverConfig,
cpuCFSQuota: kubeCfg.CPUCFSQuota,
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
flannelExperimentalOverlay: myFlannelExperimentalOverlay,
flannelHelper: nil,
nodeIP: net.ParseIP(kubeCfg.NodeIP),
clock: clock.RealClock{},
outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration,
reservation: *reservation,
enableCustomMetrics: kubeCfg.EnableCustomMetrics,
babysitDaemons: kubeCfg.BabysitDaemons,
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
}
if klet.flannelExperimentalOverlay {
klet.flannelHelper = NewFlannelHelper()
glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
}
if klet.nodeIP != nil {
if err := klet.validateNodeIP(); err != nil {
return nil, err
}
glog.Infof("Using node IP: %q", klet.nodeIP.String())
}
if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.ConfigureCBR0, kubeCfg.NetworkPluginName); err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
glog.Fatalf("Invalid hairpin mode: %v", err)
} else {
klet.hairpinMode = mode
}
glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
machineInfo, err := klet.GetCachedMachineInfo()
if err != nil {
return nil, err
}
procFs := procfs.NewProcFS()
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
if kubeCfg.RemoteRuntimeEndpoint != "" {
kubeCfg.ContainerRuntime = "remote"
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
if kubeCfg.RemoteImageEndpoint == "" {
kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
}
}
// Initialize the runtime.
switch kubeCfg.ContainerRuntime {
case "docker":
// Only supported one for now, continue.
klet.containerRuntime = dockertools.NewDockerManager(
kubeDeps.DockerClient,
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
klet.podManager,
machineInfo,
kubeCfg.PodInfraContainerImage,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
containerLogsDir,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
dockerExecHandler,
kubeDeps.OOMAdjuster,
procFs,
klet.cpuCFSQuota,
imageBackOff,
kubeCfg.SerializeImagePulls,
kubeCfg.EnableCustomMetrics,
// If using "kubenet", the Kubernetes network plugin that wraps
// CNI's bridge plugin, it knows how to set the hairpin veth flag
// so we tell the container runtime to back away from setting it.
// If the kubelet is started with any other plugin we can't be
// sure it handles the hairpin case so we instruct the docker
// runtime to set the flag instead.
klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
kubeCfg.SeccompProfileRoot,
kubeDeps.ContainerRuntimeOptions...,
)
case "rkt":
// TODO: Include hairpin mode settings in rkt?
conf := &rkt.Config{
Path: kubeCfg.RktPath,
Stage1Image: kubeCfg.RktStage1Image,
InsecureOptions: "image,ondisk",
}
rktRuntime, err := rkt.New(
kubeCfg.RktAPIEndpoint,
conf,
klet,
kubeDeps.Recorder,
containerRefManager,
klet.podManager,
klet.livenessManager,
klet.httpClient,
klet.networkPlugin,
klet.hairpinMode == componentconfig.HairpinVeth,
utilexec.New(),
kubecontainer.RealOS{},
imageBackOff,
kubeCfg.SerializeImagePulls,
kubeCfg.RuntimeRequestTimeout.Duration,
)
if err != nil {
return nil, err
}
klet.containerRuntime = rktRuntime
case "remote":
remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
klet.cpuCFSQuota,
remoteRuntimeService,
remoteImageService,
)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.updatePodCIDR(kubeCfg.PodCIDR)
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
klet.runner = klet.containerRuntime
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
if err != nil {
return nil, err
}
// setup volumeManager
klet.volumeManager, err = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
klet.getPodsDir(),
kubeDeps.Recorder)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// setup eviction manager
evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock)
if err != nil {
return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
}
klet.evictionManager = evictionManager
klet.AddPodAdmitHandler(evictionAdmitHandler)
// add sysctl admission
runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
if err != nil {
return nil, err
}
safeWhitelist, err := sysctl.NewWhitelist(sysctl.SafeSysctlWhitelist(), api.SysctlsPodAnnotationKey)
if err != nil {
return nil, err
}
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, api.UnsafeSysctlsPodAnnotationKey)
if err != nil {
return nil, err
}
klet.AddPodAdmitHandler(runtimeSupport)
klet.AddPodAdmitHandler(safeWhitelist)
klet.AddPodAdmitHandler(unsafeWhitelist)
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
klet.appArmorValidator = apparmor.NewValidator(kubeCfg.ContainerRuntime)
klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
// apply functional Option's
for _, opt := range kubeDeps.Options {
opt(klet)
}
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
return klet, nil
}
type serviceLister interface {
List() (api.ServiceList, error)
}
type nodeLister interface {
List() (machines api.NodeList, err error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
kubeletConfiguration componentconfig.KubeletConfiguration
hostname string
nodeName string
dockerClient dockertools.DockerInterface
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
iptClient utilipt.Interface
rootDirectory string
// podWorkers handle syncing Pods in response to events.
podWorkers PodWorkers
// resyncInterval is the interval between periodic full reconciliations of
// pods on this node.
resyncInterval time.Duration
// sourcesReady records the sources seen by the kubelet, it is thread-safe.
sourcesReady config.SourcesReady
// podManager is a facade that abstracts away the various sources of pods
// this Kubelet services.
podManager kubepod.Manager
// Needed to observe and respond to situations that could impact node stability
evictionManager eviction.Manager
// Needed to report events for containers belonging to deleted/modified pods.
// Tracks references for reporting events
containerRefManager *kubecontainer.RefManager
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
// Optional, defaults to simple Docker implementation
runner kubecontainer.ContainerCommandRunner
// Optional, client for http requests, defaults to empty client
httpClient kubetypes.HttpGetter
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// Set to true to have the node register itself as schedulable.
registerSchedulable bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
// Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
standaloneMode bool
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS net.IP
// masterServiceNamespace is the namespace that the master service is exposed in.
masterServiceNamespace string
// serviceLister knows how to list services
serviceLister serviceLister
// nodeLister knows how to list nodes
nodeLister nodeLister
// nodeInfo knows how to get information about the node for this kubelet.
nodeInfo predicates.NodeInfo
// a list of node labels to register
nodeLabels map[string]string
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
runtimeState *runtimeState
// Volume plugins.
volumePluginMgr *volume.VolumePluginMgr
// Network plugin.
networkPlugin network.NetworkPlugin
// Handles container probing.
probeManager prober.Manager
// Manages container health check results.
livenessManager proberesults.Manager
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time.Duration
// The EventRecorder to use
recorder record.EventRecorder
// Policy for handling garbage collection of dead containers.
containerGC kubecontainer.ContainerGC
// Manager for image garbage collection.
imageManager images.ImageGCManager
// Diskspace manager.
diskSpaceManager diskSpaceManager
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorapi.MachineInfo
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager status.Manager
// VolumeManager runs a set of asynchronous loops that figure out which
// volumes need to be attached/mounted/unmounted/detached based on the pods
// scheduled on this node and makes it so.
volumeManager volumemanager.VolumeManager
// Cloud provider interface.
cloud cloudprovider.Interface
autoDetectCloudProvider bool
// Reference to this node.
nodeRef *api.ObjectReference
// Container runtime.
containerRuntime kubecontainer.Runtime
// reasonCache caches the failure reason of the last creation of all containers, which is
// used for generating ContainerStatus.
reasonCache *ReasonCache
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator
// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
// os is a facade for various syscalls that need to be mocked during testing.
os kubecontainer.OSInterface
// Watcher of out of memory events.
oomWatcher OOMWatcher
// Monitor resource usage
resourceAnalyzer stats.ResourceAnalyzer
// Whether or not we should have the QOS cgroup hierarchy for resource management
cgroupsPerQOS bool
// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string
// Mounter to use for volumes.
mounter mount.Interface
// Writer interface to use for volumes.
writer kubeio.Writer
// Manager of non-Runtime containers.
containerManager cm.ContainerManager
nodeConfig cm.NodeConfig
// Whether or not kubelet should take responsibility for keeping cbr0 in
// the correct state.
configureCBR0 bool
reconcileCIDR bool
// Traffic to IPs outside this range will use IP masquerade.
nonMasqueradeCIDR string
// Maximum Number of Pods which can be run by this Kubelet
maxPods int
// Number of NVIDIA GPUs on this node
nvidiaGPUs int
// Monitor Kubelet's sync loop
syncLoopMonitor atomic.Value
// Container restart Backoff
backOff *flowcontrol.Backoff
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.PodPair
// The configuration file used as the base to generate the container's
// DNS resolver configuration file. This can be used in conjunction with
// clusterDomain and clusterDNS.
resolverConfig string
// Optionally shape the bandwidth of a pod
// TODO: remove when kubenet plugin is ready
shaper bandwidth.BandwidthShaper
// True if container cpu limits should be enforced via cgroup CFS quota
cpuCFSQuota bool
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *api.NodeDaemonEndpoints
// A queue used to trigger pod workers.
workQueue queue.WorkQueue
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
// flannelExperimentalOverlay determines whether the experimental flannel
// network overlay is active.
flannelExperimentalOverlay bool
// TODO: Flannelhelper doesn't store any state, we can instantiate it
// on the fly if we're confident the dbus connetions it opens doesn't
// put the system under duress.
flannelHelper *FlannelHelper
// If non-nil, use this IP address for the node
nodeIP net.IP
// clock is an interface that provides time related functionality in a way that makes it
// easy to test the code.
clock clock.Clock
// outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually
// not out of disk before it can transition the node condition status from out-of-disk to
// not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
// getting rescheduled onto the node.
outOfDiskTransitionFrequency time.Duration
// reservation specifies resources which are reserved for non-pod usage, including kubernetes and
// non-kubernetes system processes.
reservation kubetypes.Reservation
// support gathering custom metrics.
enableCustomMetrics bool
// How the Kubelet should setup hairpin NAT. Can take the values: "promiscuous-bridge"
// (make cbr0 promiscuous), "hairpin-veth" (set the hairpin flag on veth interfaces)
// or "none" (do nothing).
hairpinMode componentconfig.HairpinMode
// The node has babysitter process monitoring docker and kubelet
babysitDaemons bool
// handlers called during the tryUpdateNodeStatus cycle
setNodeStatusFuncs []func(*api.Node) error
// TODO: think about moving this to be centralized in PodWorkers in follow-on.
// the list of handlers to call during pod admission.
lifecycle.PodAdmitHandlers
// the list of handlers to call during pod sync loop.
lifecycle.PodSyncLoopHandlers
// the list of handlers to call during pod sync.
lifecycle.PodSyncHandlers
// the number of allowed pods per core
podsPerCore int
// enableControllerAttachDetach indicates the Attach/Detach controller
// should manage attachment/detachment of volumes scheduled to this node,
// and disable kubelet from executing any attach/detach operations
enableControllerAttachDetach bool
// trigger deleting containers in a pod
containerDeletor *podContainerDeletor
// config iptables util rules
makeIPTablesUtilChains bool
// The bit of the fwmark space to mark packets for SNAT.
iptablesMasqueradeBit int
// The bit of the fwmark space to mark packets for dropping.
iptablesDropBit int
// The AppArmor validator for checking whether AppArmor is supported.
appArmorValidator apparmor.Validator
}
// setupDataDirs creates:
// 1. the root directory
// 2. the pods directory
// 3. the plugins directory
func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = path.Clean(kl.rootDirectory)
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
return fmt.Errorf("error creating root directory: %v", err)
}
if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
return fmt.Errorf("error creating pods directory: %v", err)
}
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins directory: %v", err)
}
return nil
}
// Get a list of pods that have data directories.
func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
podInfos, err := ioutil.ReadDir(kl.getPodsDir())
if err != nil {
return nil, err
}
pods := []types.UID{}
for i := range podInfos {
if podInfos[i].IsDir() {
pods = append(pods, types.UID(podInfos[i].Name()))
}
}
return pods, nil
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
}
}, ContainerGCPeriod, wait.NeverStop)
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
glog.Errorf("Image garbage collection failed: %v", err)
}
}, ImageGCPeriod, wait.NeverStop)
}
// initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error {
// Step 1: Promethues metrics.
metrics.Register(kl.runtimeCache)
// Step 2: Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return err
}
// Step 3: If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := kl.os.MkdirAll(containerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
}
}
// Step 4: Start the image manager.
if err := kl.imageManager.Start(); err != nil {
return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
// Step 5: Start container manager.
node, err := kl.getNodeAnyWay()
if err != nil {
glog.Errorf("Cannot get Node info: %v", err)
return fmt.Errorf("Kubelet failed to get node info.")
}
if err := kl.containerManager.Start(node); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}
// Step 6: Start out of memory watcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("Failed to start OOM watcher %v", err)
}
// Step 7: Start resource analyzer
kl.resourceAnalyzer.Start()
return nil
}
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
// TODO(random-liu): Add backoff logic in the babysitter
glog.Fatalf("Failed to start cAdvisor %v", err)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
if err := kl.evictionManager.Start(kl, kl.getActivePods, evictionMonitoringPeriod); err != nil {
kl.runtimeState.setInternalError(fmt.Errorf("failed to start eviction manager %v", err))
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.Error(err)
kl.runtimeState.setInitError(err)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
// getActivePods returns non-terminal pods
func (kl *Kubelet) getActivePods() []*api.Pod {
allPods := kl.podManager.GetPods()
activePods := kl.filterOutTerminatedPods(allPods)
return activePods
}
// makeMounts determines the mount points for the given container.
func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
// Kubernetes only mounts on /etc/hosts if :
// - container does not use hostNetwork and
// - container is not an infrastructure(pause) container
// - container is not already mounting on /etc/hosts
// When the pause container is being created, its IP is still unknown. Hence, PodIP will not have been set.
mountEtcHostsFile := (pod.Spec.SecurityContext == nil || !pod.Spec.SecurityContext.HostNetwork) && len(podIP) > 0
glog.V(3).Infof("container: %v/%v/%v podIP: %q creating hosts mount: %v", pod.Namespace, pod.Name, container.Name, podIP, mountEtcHostsFile)
mounts := []kubecontainer.Mount{}
for _, mount := range container.VolumeMounts {
mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
vol, ok := podVolumes[mount.Name]
if !ok {
glog.Warningf("Mount cannot be satisfied for container %q, because the volume is missing: %q", container.Name, mount)
continue
}
relabelVolume := false
// If the volume supports SELinux and it has not been
// relabeled already and it is not a read-only volume,
// relabel it and mark it as labeled
if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux && !vol.SELinuxLabeled {
vol.SELinuxLabeled = true
relabelVolume = true
}
hostPath, err := volume.GetPath(vol.Mounter)
if err != nil {
return nil, err
}
if mount.SubPath != "" {
hostPath = filepath.Join(hostPath, mount.SubPath)
}
mounts = append(mounts, kubecontainer.Mount{
Name: mount.Name,
ContainerPath: mount.MountPath,
HostPath: hostPath,
ReadOnly: mount.ReadOnly,
SELinuxRelabel: relabelVolume,
})
}
if mountEtcHostsFile {
hostsMount, err := makeHostsMount(podDir, podIP, hostName, hostDomain)
if err != nil {
return nil, err
}
mounts = append(mounts, *hostsMount)
}
return mounts, nil
}
// makeHostsMount makes the mountpoint for the hosts file that the containers
// in a pod are injected with.
func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontainer.Mount, error) {
hostsFilePath := path.Join(podDir, "etc-hosts")
if err := ensureHostsFile(hostsFilePath, podIP, hostName, hostDomainName); err != nil {
return nil, err
}
return &kubecontainer.Mount{
Name: "k8s-managed-etc-hosts",
ContainerPath: etcHostsPath,
HostPath: hostsFilePath,
ReadOnly: false,
}, nil
}
// ensureHostsFile ensures that the given host file has an up-to-date ip, host
// name, and domain name.
func ensureHostsFile(fileName, hostIP, hostName, hostDomainName string) error {
if _, err := os.Stat(fileName); os.IsExist(err) {
glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName)
return nil
}
var buffer bytes.Buffer
buffer.WriteString("# Kubernetes-managed hosts file.\n")
buffer.WriteString("127.0.0.1\tlocalhost\n") // ipv4 localhost
buffer.WriteString("::1\tlocalhost ip6-localhost ip6-loopback\n") // ipv6 localhost
buffer.WriteString("fe00::0\tip6-localnet\n")
buffer.WriteString("fe00::0\tip6-mcastprefix\n")
buffer.WriteString("fe00::1\tip6-allnodes\n")
buffer.WriteString("fe00::2\tip6-allrouters\n")
if len(hostDomainName) > 0 {
buffer.WriteString(fmt.Sprintf("%s\t%s.%s\t%s\n", hostIP, hostName, hostDomainName, hostName))
} else {
buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostIP, hostName))
}
return ioutil.WriteFile(fileName, buffer.Bytes(), 0644)
}
func makePortMappings(container *api.Container) (ports []kubecontainer.PortMapping) {
names := make(map[string]struct{})
for _, p := range container.Ports {
pm := kubecontainer.PortMapping{
HostPort: int(p.HostPort),
ContainerPort: int(p.ContainerPort),
Protocol: p.Protocol,
HostIP: p.HostIP,
}
// We need to create some default port name if it's not specified, since
// this is necessary for rkt.
// http://issue.k8s.io/7710
if p.Name == "" {
pm.Name = fmt.Sprintf("%s-%s:%d", container.Name, p.Protocol, p.ContainerPort)
} else {
pm.Name = fmt.Sprintf("%s-%s", container.Name, p.Name)
}
// Protect against exposing the same protocol-port more than once in a container.
if _, ok := names[pm.Name]; ok {
glog.Warningf("Port name conflicted, %q is defined more than once", pm.Name)
continue
}
ports = append(ports, pm)
names[pm.Name] = struct{}{}
}
return
}
func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *api.Pod) (string, string, error) {
// TODO(vmarmol): Handle better.
// Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
clusterDomain := kl.clusterDomain
const hostnameMaxLen = 63
podAnnotations := pod.Annotations
if podAnnotations == nil {
podAnnotations = make(map[string]string)
}
hostname := pod.Name
if len(pod.Spec.Hostname) > 0 {
if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Hostname); len(msgs) != 0 {
return "", "", fmt.Errorf("Pod Hostname %q is not a valid DNS label: %s", pod.Spec.Hostname, strings.Join(msgs, ";"))
}
hostname = pod.Spec.Hostname
} else {
hostnameCandidate := podAnnotations[utilpod.PodHostnameAnnotation]
if len(utilvalidation.IsDNS1123Label(hostnameCandidate)) == 0 {
// use hostname annotation, if specified.
hostname = hostnameCandidate
}
}
if len(hostname) > hostnameMaxLen {
hostname = hostname[:hostnameMaxLen]
glog.Errorf("hostname for pod:%q was longer than %d. Truncated hostname to :%q", pod.Name, hostnameMaxLen, hostname)
}
hostDomain := ""
if len(pod.Spec.Subdomain) > 0 {
if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Subdomain); len(msgs) != 0 {
return "", "", fmt.Errorf("Pod Subdomain %q is not a valid DNS label: %s", pod.Spec.Subdomain, strings.Join(msgs, ";"))
}
hostDomain = fmt.Sprintf("%s.%s.svc.%s", pod.Spec.Subdomain, pod.Namespace, clusterDomain)
} else {
subdomainCandidate := pod.Annotations[utilpod.PodSubdomainAnnotation]
if len(utilvalidation.IsDNS1123Label(subdomainCandidate)) == 0 {
hostDomain = fmt.Sprintf("%s.%s.svc.%s", subdomainCandidate, pod.Namespace, clusterDomain)
}
}
return hostname, hostDomain, nil
}
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
var err error
opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot}
hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
if err != nil {
return nil, err
}
opts.Hostname = hostname
podName := volumehelper.GetUniquePodName(pod)
volumes := kl.volumeManager.GetMountedVolumesForPod(podName)
opts.PortMappings = makePortMappings(container)
// Docker does not relabel volumes if the container is running
// in the host pid or ipc namespaces so the kubelet must
// relabel the volumes
if pod.Spec.SecurityContext != nil && (pod.Spec.SecurityContext.HostIPC || pod.Spec.SecurityContext.HostPID) {
err = kl.relabelVolumes(pod, volumes)
if err != nil {
return nil, err
}
}
opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
if err != nil {
return nil, err
}
opts.Envs, err = kl.makeEnvironmentVariables(pod, container, podIP)
if err != nil {
return nil, err
}
if len(container.TerminationMessagePath) != 0 {
p := kl.getPodContainerDir(pod.UID, container.Name)
if err := os.MkdirAll(p, 0750); err != nil {
glog.Errorf("Error on creating %q: %v", p, err)
} else {
opts.PodContainerDir = p
}
}
opts.DNS, opts.DNSSearch, err = kl.GetClusterDNS(pod)
if err != nil {
return nil, err
}
return opts, nil
}
var masterServices = sets.NewString("kubernetes")
// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
var (
serviceMap = make(map[string]api.Service)
m = make(map[string]string)
)
// Get all service resources from the master (via a cache),
// and populate them into service environment variables.
if kl.serviceLister == nil {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m, nil
}
services, err := kl.serviceLister.List()
if err != nil {
return m, fmt.Errorf("failed to list services when setting up env vars.")
}
// project the services in namespace ns onto the master services
for _, service := range services.Items {
// ignore services where ClusterIP is "None" or empty
if !api.IsServiceIPSet(&service) {
continue
}
serviceName := service.Name
switch service.Namespace {
// for the case whether the master service namespace is the namespace the pod
// is in, the pod should receive all the services in the namespace.
//
// ordering of the case clauses below enforces this
case ns:
serviceMap[serviceName] = service
case kl.masterServiceNamespace:
if masterServices.Has(serviceName) {
if _, exists := serviceMap[serviceName]; !exists {
serviceMap[serviceName] = service
}
}
}
}
services.Items = []api.Service{}
for _, service := range serviceMap {
services.Items = append(services.Items, service)
}
for _, e := range envvars.FromServices(&services) {
m[e.Name] = e.Value
}
return m, nil
}
// Make the environment variables for a pod in the given namespace.
func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Container, podIP string) ([]kubecontainer.EnvVar, error) {
var result []kubecontainer.EnvVar
// Note: These are added to the docker Config, but are not included in the checksum computed
// by dockertools.BuildDockerName(...). That way, we can still determine whether an
// api.Container is already running by its hash. (We don't want to restart a container just
// because some service changed.)
//
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
serviceEnv, err := kl.getServiceEnvVarMap(pod.Namespace)
if err != nil {
return result, err
}
// Determine the final values of variables:
//
// 1. Determine the final value of each variable:
// a. If the variable's Value is set, expand the `$(var)` references to other
// variables in the .Value field; the sources of variables are the declared
// variables of the container and the service environment variables
// b. If a source is defined for an environment variable, resolve the source
// 2. Create the container's environment in the order variables are declared
// 3. Add remaining service environment vars
var (
tmpEnv = make(map[string]string)
configMaps = make(map[string]*api.ConfigMap)
secrets = make(map[string]*api.Secret)
mappingFunc = expansion.MappingFuncFor(tmpEnv, serviceEnv)
)
for _, envVar := range container.Env {
// Accesses apiserver+Pods.
// So, the master may set service env vars, or kubelet may. In case both are doing
// it, we delete the key from the kubelet-generated ones so we don't have duplicate
// env vars.
// TODO: remove this net line once all platforms use apiserver+Pods.
delete(serviceEnv, envVar.Name)
runtimeVal := envVar.Value
if runtimeVal != "" {
// Step 1a: expand variable references
runtimeVal = expansion.Expand(runtimeVal, mappingFunc)
} else if envVar.ValueFrom != nil {
// Step 1b: resolve alternate env var sources
switch {
case envVar.ValueFrom.FieldRef != nil:
runtimeVal, err = kl.podFieldSelectorRuntimeValue(envVar.ValueFrom.FieldRef, pod, podIP)
if err != nil {
return result, err
}
case envVar.ValueFrom.ResourceFieldRef != nil:
defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardApi(pod, container)
if err != nil {
return result, err
}
runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, defaultedContainer)
if err != nil {
return result, err
}
case envVar.ValueFrom.ConfigMapKeyRef != nil:
name := envVar.ValueFrom.ConfigMapKeyRef.Name
key := envVar.ValueFrom.ConfigMapKeyRef.Key
configMap, ok := configMaps[name]
if !ok {
configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name)
if err != nil {
return result, err
}
}
runtimeVal, ok = configMap.Data[key]
if !ok {
return result, fmt.Errorf("Couldn't find key %v in ConfigMap %v/%v", key, pod.Namespace, name)
}
case envVar.ValueFrom.SecretKeyRef != nil:
name := envVar.ValueFrom.SecretKeyRef.Name
key := envVar.ValueFrom.SecretKeyRef.Key
secret, ok := secrets[name]
if !ok {
secret, err = kl.kubeClient.Core().Secrets(pod.Namespace).Get(name)
if err != nil {
return result, err
}
}
runtimeValBytes, ok := secret.Data[key]
if !ok {
return result, fmt.Errorf("Couldn't find key %v in Secret %v/%v", key, pod.Namespace, name)
}
runtimeVal = string(runtimeValBytes)
}
}
tmpEnv[envVar.Name] = runtimeVal
result = append(result, kubecontainer.EnvVar{Name: envVar.Name, Value: tmpEnv[envVar.Name]})
}
// Append remaining service env vars.
for k, v := range serviceEnv {
result = append(result, kubecontainer.EnvVar{Name: k, Value: v})
}
return result, nil
}
// podFieldSelectorRuntimeValue returns the runtime value of the given
// selector for a pod.
func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *api.ObjectFieldSelector, pod *api.Pod, podIP string) (string, error) {
internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "")
if err != nil {
return "", err
}
switch internalFieldPath {
case "spec.nodeName":
return pod.Spec.NodeName, nil
case "spec.serviceAccountName":
return pod.Spec.ServiceAccountName, nil
case "status.podIP":
return podIP, nil
}
return fieldpath.ExtractFieldPathAsString(pod, internalFieldPath)
}
// containerResourceRuntimeValue returns the value of the provided container resource
func containerResourceRuntimeValue(fs *api.ResourceFieldSelector, pod *api.Pod, container *api.Container) (string, error) {
containerName := fs.ContainerName
if len(containerName) == 0 {
return fieldpath.ExtractContainerResourceValue(fs, container)
} else {
return fieldpath.ExtractResourceValueByContainerName(fs, pod, containerName)
}
}
// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
var hostDNS, hostSearch []string
// Get host DNS settings
if kl.resolverConfig != "" {
f, err := os.Open(kl.resolverConfig)
if err != nil {
return nil, nil, err
}
defer f.Close()
hostDNS, hostSearch, err = kl.parseResolvConf(f)
if err != nil {
return nil, nil, err
}
}
useClusterFirstPolicy := pod.Spec.DNSPolicy == api.DNSClusterFirst
if useClusterFirstPolicy && kl.clusterDNS == nil {
// clusterDNS is not known.
// pod with ClusterDNSFirst Policy cannot be created
kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)
// fallback to DNSDefault
useClusterFirstPolicy = false
}
if !useClusterFirstPolicy {
// When the kubelet --resolv-conf flag is set to the empty string, use
// DNS settings that override the docker default (which is to use
// /etc/resolv.conf) and effectively disable DNS lookups. According to
// the bind documentation, the behavior of the DNS client library when
// "nameservers" are not specified is to "use the nameserver on the
// local machine". A nameserver setting of localhost is equivalent to
// this documented behavior.
if kl.resolverConfig == "" {
hostDNS = []string{"127.0.0.1"}
hostSearch = []string{"."}
}
return hostDNS, hostSearch, nil
}
// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
// in case the cluster DNS server cannot resolve the DNS query itself
dns := []string{kl.clusterDNS.String()}
var dnsSearch []string
if kl.clusterDomain != "" {
nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
} else {
dnsSearch = hostSearch
}
return dns, dnsSearch, nil
}
// One of the following arguments must be non-nil: runningPod, status.
// TODO: Modify containerRuntime.KillPod() to accept the right arguments.
func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error {
var p kubecontainer.Pod
if runningPod != nil {
p = *runningPod
} else if status != nil {
p = kubecontainer.ConvertPodStatusToRunningPod(status)
}
return kl.containerRuntime.KillPod(pod, p, gracePeriodOverride)
}
// makePodDataDirs creates the dirs for the pod datas.
func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
uid := pod.UID
if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
return err
}
return nil
}
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an api.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step if this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
killPodOptions := o.killPodOptions
if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
return fmt.Errorf("kill pod options are required if update type is kill")
}
apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we kill the pod with the specified grace period since this is a termination
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
return nil
}
// Latency measurements for the main workflow are relative to the
// first time the pod was seen by the API server.
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
} else {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
}
}
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IP = apiPodStatus.PodIP
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Kill pod if it should not be running
if errOuter := canRunPod(pod); errOuter != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
if errInner := kl.killPod(pod, nil, podStatus, nil); errInner != nil {
errOuter = fmt.Errorf("error killing pod: %v", errInner)
utilruntime.HandleError(errOuter)
}
// there was no error killing the pod, but the pod cannot be run, so we return that err (if any)
return errOuter
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubepod.IsStaticPod(pod) {
podFullName := kubecontainer.GetPodFullName(pod)
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
} else {
deleted = true
}
}
}
if mirrorPod == nil || deleted {
glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
}
}
}
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
return err
}
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, api.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
// Fetch the pull secrets for the pod
pullSecrets, err := kl.getPullSecretsForPod(pod)
if err != nil {
glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
return err
}
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err = result.Error(); err != nil {
return err
}
// early successful exit if pod is not bandwidth-constrained
if !kl.shapingEnabled() {
return nil
}
// Update the traffic shaping for the pod's ingress and egress limits
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return err
}
if egress != nil || ingress != nil {
if podUsesHostNetwork(pod) {
kl.recorder.Event(pod, api.EventTypeWarning, events.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network")
} else if kl.shaper != nil {
if len(apiPodStatus.PodIP) > 0 {
err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress)
}
} else {
kl.recorder.Event(pod, api.EventTypeWarning, events.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined")
}
}
return nil
}
// returns whether the pod uses the host network namespace.
func podUsesHostNetwork(pod *api.Pod) bool {
return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
}
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull
// secrets.
// TODO: duplicate secrets are being retrieved multiple times and there
// is no cache. Creating and using a secret manager interface will make this
// easier to address.
func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
pullSecrets := []api.Secret{}
for _, secretRef := range pod.Spec.ImagePullSecrets {
secret, err := kl.kubeClient.Core().Secrets(pod.Namespace).Get(secretRef.Name)
if err != nil {
glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
continue
}
pullSecrets = append(pullSecrets, *secret)
}
return pullSecrets, nil
}
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * internal modules that request sync of a pod.
func (kl *Kubelet) getPodsToSync() []*api.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
podUIDSet := sets.NewString()
for _, podUID := range podUIDs {
podUIDSet.Insert(string(podUID))
}
var podsToSync []*api.Pod
for _, pod := range allPods {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
}
// Returns true if pod is in the terminated state ("Failed" or "Succeeded").
func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
var status api.PodStatus
// Check the cached pod status which was set after the last sync.
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
status = pod.Status
}
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
return true
}
return false
}
// filterOutTerminatedPods returns the given pods which the status manager
// does not consider failed or succeeded.
func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod {
var filteredPods []*api.Pod
for _, p := range pods {
if kl.podIsTerminated(p) {
continue
}
filteredPods = append(filteredPods, p)
}
return filteredPods
}
// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podUIDs[pod.UID] = true
}
for _, pod := range mirrorPods {
podUIDs[pod.UID] = true
}
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// deletePod deletes the pod from the internal state of the kubelet by:
// 1. stopping the associated pod worker asynchronously
// 2. signaling to kill the pod by sending on the podKillingCh channel
//
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(pod *api.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
kl.podWorkers.ForgetWorker(pod.UID)
// Runtime cache may not have been updated to with the pod, but it's okay
// because the periodic cleanup routine will attempt to delete again later.
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
return fmt.Errorf("error listing containers: %v", err)
}
runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
if runningPod.IsEmpty() {
return fmt.Errorf("pod not found")
}
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
kl.podKillingCh <- &podPair
// TODO: delete the mirror pod here?
// We leave the volume/directory cleanup to the periodic cleanup routine.
return nil
}
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
func (kl *Kubelet) HandlePodCleanups() error {
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
// TODO: the logic here does not handle two cases:
// 1. If the containers were removed immediately after they died, kubelet
// may fail to generate correct statuses, let alone filtering correctly.
// 2. If kubelet restarted before writing the terminated status for a pod
// to the apiserver, it could still restart the terminated pod (even
// though the pod was not considered terminated by the apiserver).
// These two conditions could be alleviated by checkpointing kubelet.
activePods := kl.filterOutTerminatedPods(allPods)
desiredPods := make(map[types.UID]empty)
for _, pod := range activePods {
desiredPods[pod.UID] = empty{}
}
// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods)
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
for _, pod := range runningPods {
if _, found := desiredPods[pod.ID]; !found {
kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
}
}
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cache to get the latest set of
// running pods to clean up the volumes.
// TODO: Evaluate the performance impact of bypassing the runtime cache.
runningPods, err = kl.containerRuntime.GetPods(false)
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Remove any orphaned volumes.
// Note that we pass all pods (including terminated pods) to the function,
// so that we don't remove volumes associated with terminated but not yet
// deleted pods.
err = kl.cleanupOrphanedPodDirs(allPods, runningPods)
if err != nil {
// We want all cleanup tasks to be run even if one of them failed. So
// we just log an error here and continue other cleanup tasks.
// This also applies to the other clean up tasks.
glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
}
// Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods()
// Clear out any old bandwidth rules
err = kl.cleanupBandwidthLimits(allPods)
if err != nil {
glog.Errorf("Failed cleaning up bandwidth limits: %v", err)
}
kl.backOff.GC()
return nil
}
// podKiller launches a goroutine to kill a pod received from the channel if
// another goroutine isn't already in action.
func (kl *Kubelet) podKiller() {
killing := sets.NewString()
resultCh := make(chan types.UID)
defer close(resultCh)
for {
select {
case podPair, ok := <-kl.podKillingCh:
if !ok {
return
}
runningPod := podPair.RunningPod
apiPod := podPair.APIPod
if killing.Has(string(runningPod.ID)) {
// The pod is already being killed.
break
}
killing.Insert(string(runningPod.ID))
go func(apiPod *api.Pod, runningPod *kubecontainer.Pod, ch chan types.UID) {
defer func() {
ch <- runningPod.ID
}()
glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
err := kl.killPod(apiPod, runningPod, nil, nil)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
}
}(apiPod, runningPod, resultCh)
case podID := <-resultCh:
killing.Delete(string(podID))
}
}
}
// checkHostPortConflicts detects pods with conflicted host ports.
func hasHostPortConflicts(pods []*api.Pod) bool {
ports := sets.String{}
for _, pod := range pods {
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
return true
}
if errs := validation.AccumulateUniqueHostPorts(pod.Spec.InitContainers, &ports, field.NewPath("spec", "initContainers")); len(errs) > 0 {
glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
return true
}
}
return false
}
// handleOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) isOutOfDisk() bool {
// Check disk space once globally and reject or accept all new pods.
withinBounds, err := kl.diskSpaceManager.IsRuntimeDiskSpaceAvailable()
// Assume enough space in case of errors.
if err != nil {
glog.Errorf("Failed to check if disk space is available for the runtime: %v", err)
} else if !withinBounds {
return true
}
withinBounds, err = kl.diskSpaceManager.IsRootDiskSpaceAvailable()
// Assume enough space in case of errors.
if err != nil {
glog.Errorf("Failed to check if disk space is available on the root partition: %v", err)
} else if !withinBounds {
return true
}
return false
}
// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manage.
func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
kl.recorder.Eventf(pod, api.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(pod, api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod " + message})
}
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" are all admitted pods
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) {
node, err := kl.getNodeAnyWay()
if err != nil {
glog.Errorf("Cannot get Node info: %v", err)
return false, "InvalidNodeInfo", "Kubelet cannot get node info."
}
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move predicate check into a pod admitter
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.PodAdmitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
nodeInfo := schedulercache.NewNodeInfo(pods...)
nodeInfo.SetNode(node)
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
if err != nil {
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return fit, "UnexpectedError", message
}
if !fit {
var reason string
var message string
if len(reasons) == 0 {
message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
return fit, "UnknownReason", message
}
// If there are failed predicates, we only return the first one as a reason.
r := reasons[0]
switch re := r.(type) {
case *predicates.PredicateFailureError:
reason = re.PredicateName
message = re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
case *predicates.InsufficientResourceError:
reason = fmt.Sprintf("OutOf%s", re.ResourceName)
message := re.Error()
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
case *predicates.FailureReason:
reason = re.GetReason()
message = fmt.Sprintf("Failure: %s", re.GetReason())
glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
default:
reason = "UnexpectedPredicateFailureType"
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
}
return fit, reason, message
}
// TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and
// add the disk space predicate to predicates.GeneralPredicates.
if kl.isOutOfDisk() {
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to OutOfDisk")
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}
return true, "", ""
}
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.errors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
}
}
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. houseKeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated. In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
kl.syncLoopMonitor.Store(kl.clock.Now())
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejcted. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*api.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*api.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
}
kl.syncLoopMonitor.Store(kl.clock.Now())
return true
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a terminated state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// TODO: handle mirror pods in a separate component (issue #17251)
func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
// corresponding static pod. Send update to the pod worker if the static
// pod exists.
if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
if kubepod.IsMirrorPod(pod) {
kl.podManager.AddPod(pod)
kl.handleMirrorPod(pod, start)
continue
}
// Note that allPods excludes the new pod.
allPods := kl.podManager.GetPods()
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(allPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
kl.podManager.AddPod(pod)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// TODO: Evaluate if we need to validate and reject updates.
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
}
kl.probeManager.RemovePod(pod)
}
}
// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled.
func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
for _, pod := range pods {
// Update the pod in pod manager, status manager will do periodically reconcile according
// to the pod manager.
kl.podManager.UpdatePod(pod)
// After an evicted pod is synced, all dead containers in the pod can be removed.
if eviction.PodIsEvicted(pod.Status) {
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
kl.containerDeletor.deleteContainersInPod("", podStatus, true)
}
}
}
}
// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}
// LatestLoopEntryTime returns the last time in the sync loop monitor.
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
val := kl.syncLoopMonitor.Load()
if val == nil {
return time.Time{}
}
return val.(time.Time)
}
// PLEGHealthCheck returns whether the PLEG is healty.
func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
return kl.pleg.Healthy()
}
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
// of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current
// running container is preferred over a previous termination. If info about the container is not available then a specific
// error is returned to the end user.
func (kl *Kubelet) validateContainerLogStatus(podName string, podStatus *api.PodStatus, containerName string, previous bool) (containerID kubecontainer.ContainerID, err error) {
var cID string
cStatus, found := api.GetContainerStatus(podStatus.ContainerStatuses, containerName)
// if not found, check the init containers
if !found {
cStatus, found = api.GetContainerStatus(podStatus.InitContainerStatuses, containerName)
}
if !found {
return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is not available", containerName, podName)
}
lastState := cStatus.LastTerminationState
waiting, running, terminated := cStatus.State.Waiting, cStatus.State.Running, cStatus.State.Terminated
switch {
case previous:
if lastState.Terminated == nil {
return kubecontainer.ContainerID{}, fmt.Errorf("previous terminated container %q in pod %q not found", containerName, podName)
}
cID = lastState.Terminated.ContainerID
case running != nil:
cID = cStatus.ContainerID
case terminated != nil:
cID = terminated.ContainerID
case lastState.Terminated != nil:
cID = lastState.Terminated.ContainerID
case waiting != nil:
// output some info for the most common pending failures
switch reason := waiting.Reason; reason {
case images.ErrImagePull.Error():
return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: image can't be pulled", containerName, podName)
case images.ErrImagePullBackOff.Error():
return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: trying and failing to pull image", containerName, podName)
default:
return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: %v", containerName, podName, reason)
}
default:
// unrecognized state
return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start - no logs yet", containerName, podName)
}
return kubecontainer.ParseContainerID(cID), nil
}
// GetKubeletContainerLogs returns logs from the container
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
if err != nil {
return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
}
pod, ok := kl.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found - no logs available", name)
}
podUID := pod.UID
if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
podUID = mirrorPod.UID
}
podStatus, found := kl.statusManager.GetPodStatus(podUID)
if !found {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
podStatus = pod.Status
}
containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous)
if err != nil {
return err
}
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
}
// updateRuntimeUp calls the container runtime status callback, initializing
// the runtime dependent modules when the container runtime first comes up,
// and returns an error if the status check fails. If the status check is OK,
// update the container runtime uptime in the kubelet runtimeState.
func (kl *Kubelet) updateRuntimeUp() {
if err := kl.containerRuntime.Status(); err != nil {
glog.Errorf("Container runtime sanity check failed: %v", err)
return
}
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadvisorapi.MachineInfo) {
if info.CloudProvider != cadvisorapi.UnknownProvider &&
info.CloudProvider != cadvisorapi.Baremetal {
// The cloud providers from pkg/cloudprovider/providers/* that update ProviderID
// will use the format of cloudprovider://project/availability_zone/instance_name
// here we only have the cloudprovider and the instance name so we leave project
// and availability zone empty for compatibility.
node.Spec.ProviderID = strings.ToLower(string(info.CloudProvider)) +
":////" + string(info.InstanceID)
}
}
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.
func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
initialized := 0
pendingInitialization := 0
failedInitialization := 0
for _, container := range spec.InitContainers {
containerStatus, ok := api.GetContainerStatus(info, container.Name)
if !ok {
pendingInitialization++
continue
}
switch {
case containerStatus.State.Running != nil:
pendingInitialization++
case containerStatus.State.Terminated != nil:
if containerStatus.State.Terminated.ExitCode == 0 {
initialized++
} else {
failedInitialization++
}
case containerStatus.State.Waiting != nil:
if containerStatus.LastTerminationState.Terminated != nil {
if containerStatus.LastTerminationState.Terminated.ExitCode == 0 {
initialized++
} else {
failedInitialization++
}
} else {
pendingInitialization++
}
default:
pendingInitialization++
}
}
unknown := 0
running := 0
waiting := 0
stopped := 0
failed := 0
succeeded := 0
for _, container := range spec.Containers {
containerStatus, ok := api.GetContainerStatus(info, container.Name)
if !ok {
unknown++
continue
}
switch {
case containerStatus.State.Running != nil:
running++
case containerStatus.State.Terminated != nil:
stopped++
if containerStatus.State.Terminated.ExitCode == 0 {
succeeded++
} else {
failed++
}
case containerStatus.State.Waiting != nil:
if containerStatus.LastTerminationState.Terminated != nil {
stopped++
} else {
waiting++
}
default:
unknown++
}
}
if failedInitialization > 0 && spec.RestartPolicy == api.RestartPolicyNever {
return api.PodFailed
}
switch {
case pendingInitialization > 0:
fallthrough
case waiting > 0:
glog.V(5).Infof("pod waiting > 0, pending")
// One or more containers has not been started
return api.PodPending
case running > 0 && unknown == 0:
// All containers have been started, and at least
// one container is running
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
// All containers are terminated
if spec.RestartPolicy == api.RestartPolicyAlways {
// All containers are in the process of restarting
return api.PodRunning
}
if stopped == succeeded {
// RestartPolicy is not Always, and all
// containers are terminated in success
return api.PodSucceeded
}
if spec.RestartPolicy == api.RestartPolicyNever {
// RestartPolicy is Never, and all containers are
// terminated with at least one in failure
return api.PodFailed
}
// RestartPolicy is OnFailure, and at least one in failure
// and in the process of restarting
return api.PodRunning
default:
glog.V(5).Infof("pod default case, pending")
return api.PodPending
}
}
// generateAPIPodStatus creates the final API pod status for a pod, given the
// internal pod status.
func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
// check if an internal module has requested the pod is evicted.
for _, podSyncHandler := range kl.PodSyncHandlers {
if result := podSyncHandler.ShouldEvict(pod); result.Evict {
return api.PodStatus{
Phase: api.PodFailed,
Reason: result.Reason,
Message: result.Message,
}
}
}
s := kl.convertStatusToAPIStatus(pod, podStatus)
// Assume info is ready to process
spec := &pod.Spec
allStatus := append(append([]api.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
s.Phase = GetPhase(spec, allStatus)
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))
// s (the PodStatus we are creating) will not have a PodScheduled condition yet, because converStatusToAPIStatus()
// does not create one. If the existing PodStatus has a PodScheduled condition, then copy it into s and make sure
// it is set to true. If the existing PodStatus does not have a PodScheduled condition, then create one that is set to true.
if _, oldPodScheduled := api.GetPodCondition(&pod.Status, api.PodScheduled); oldPodScheduled != nil {
s.Conditions = append(s.Conditions, *oldPodScheduled)
}
api.UpdatePodCondition(&pod.Status, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionTrue,
})
if !kl.standaloneMode {
hostIP, err := kl.getHostIPAnyWay()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {
s.HostIP = hostIP.String()
if podUsesHostNetwork(pod) && s.PodIP == "" {
s.PodIP = hostIP.String()
}
}
}
return *s
}
// convertStatusToAPIStatus creates an api PodStatus for the given pod from
// the given internal pod status. It is purely transformative and does not
// alter the kubelet state at all.
func (kl *Kubelet) convertStatusToAPIStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) *api.PodStatus {
var apiPodStatus api.PodStatus
apiPodStatus.PodIP = podStatus.IP
apiPodStatus.ContainerStatuses = kl.convertToAPIContainerStatuses(
pod, podStatus,
pod.Status.ContainerStatuses,
pod.Spec.Containers,
len(pod.Spec.InitContainers) > 0,
false,
)
apiPodStatus.InitContainerStatuses = kl.convertToAPIContainerStatuses(
pod, podStatus,
pod.Status.InitContainerStatuses,
pod.Spec.InitContainers,
len(pod.Spec.InitContainers) > 0,
true,
)
return &apiPodStatus
}
func (kl *Kubelet) convertToAPIContainerStatuses(pod *api.Pod, podStatus *kubecontainer.PodStatus, previousStatus []api.ContainerStatus, containers []api.Container, hasInitContainers, isInitContainer bool) []api.ContainerStatus {
convertContainerStatus := func(cs *kubecontainer.ContainerStatus) *api.ContainerStatus {
cid := cs.ID.String()
status := &api.ContainerStatus{
Name: cs.Name,
RestartCount: int32(cs.RestartCount),
Image: cs.Image,
ImageID: cs.ImageID,
ContainerID: cid,
}
switch cs.State {
case kubecontainer.ContainerStateRunning:
status.State.Running = &api.ContainerStateRunning{StartedAt: unversioned.NewTime(cs.StartedAt)}
case kubecontainer.ContainerStateExited:
status.State.Terminated = &api.ContainerStateTerminated{
ExitCode: int32(cs.ExitCode),
Reason: cs.Reason,
Message: cs.Message,
StartedAt: unversioned.NewTime(cs.StartedAt),
FinishedAt: unversioned.NewTime(cs.FinishedAt),
ContainerID: cid,
}
default:
status.State.Waiting = &api.ContainerStateWaiting{}
}
return status
}
// Fetch old containers statuses from old pod status.
oldStatuses := make(map[string]api.ContainerStatus, len(containers))
for _, status := range previousStatus {
oldStatuses[status.Name] = status
}
// Set all container statuses to default waiting state
statuses := make(map[string]*api.ContainerStatus, len(containers))
defaultWaitingState := api.ContainerState{Waiting: &api.ContainerStateWaiting{Reason: "ContainerCreating"}}
if hasInitContainers {
defaultWaitingState = api.ContainerState{Waiting: &api.ContainerStateWaiting{Reason: "PodInitializing"}}
}
for _, container := range containers {
status := &api.ContainerStatus{
Name: container.Name,
Image: container.Image,
State: defaultWaitingState,
}
// Apply some values from the old statuses as the default values.
if oldStatus, found := oldStatuses[container.Name]; found {
status.RestartCount = oldStatus.RestartCount
status.LastTerminationState = oldStatus.LastTerminationState
}
statuses[container.Name] = status
}
// Make the latest container status comes first.
sort.Sort(sort.Reverse(kubecontainer.SortContainerStatusesByCreationTime(podStatus.ContainerStatuses)))
// Set container statuses according to the statuses seen in pod status
containerSeen := map[string]int{}
for _, cStatus := range podStatus.ContainerStatuses {
cName := cStatus.Name
if _, ok := statuses[cName]; !ok {
// This would also ignore the infra container.
continue
}
if containerSeen[cName] >= 2 {
continue
}
status := convertContainerStatus(cStatus)
if containerSeen[cName] == 0 {
statuses[cName] = status
} else {
statuses[cName].LastTerminationState = status.State
}
containerSeen[cName] = containerSeen[cName] + 1
}
// Handle the containers failed to be started, which should be in Waiting state.
for _, container := range containers {
if isInitContainer {
// If the init container is terminated with exit code 0, it won't be restarted.
// TODO(random-liu): Handle this in a cleaner way.
s := podStatus.FindContainerStatusByName(container.Name)
if s != nil && s.State == kubecontainer.ContainerStateExited && s.ExitCode == 0 {
continue
}
}
// If a container should be restarted in next syncpod, it is *Waiting*.
if !kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
continue
}
status := statuses[container.Name]
reason, message, ok := kl.reasonCache.Get(pod.UID, container.Name)
if !ok {
// In fact, we could also apply Waiting state here, but it is less informative,
// and the container will be restarted soon, so we prefer the original state here.
// Note that with the current implementation of ShouldContainerBeRestarted the original state here
// could be:
// * Waiting: There is no associated historical container and start failure reason record.
// * Terminated: The container is terminated.
continue
}
if status.State.Terminated != nil {
status.LastTerminationState = status.State
}
status.State = api.ContainerState{
Waiting: &api.ContainerStateWaiting{
Reason: reason.Error(),
Message: message,
},
}
statuses[container.Name] = status
}
var containerStatuses []api.ContainerStatus
for _, status := range statuses {
containerStatuses = append(containerStatuses, *status)
}
// Sort the container statuses since clients of this interface expect the list
// of containers in a pod has a deterministic order.
if isInitContainer {
kubetypes.SortInitContainerStatuses(pod, containerStatuses)
} else {
sort.Sort(kubetypes.SortedContainerStatuses(containerStatuses))
}
return containerStatuses
}
// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.logServer.ServeHTTP(w, req)
}
// findContainer finds and returns the container with the given pod ID, full name, and container name.
// It returns nil if not found.
func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) {
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return nil, err
}
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
return pod.FindContainerByName(containerName), nil
}
// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containerName string, cmd []string) ([]byte, error) {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return nil, err
}
if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName)
}
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
}
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
podUID = kl.podManager.TranslatePodUID(podUID)
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
podUID = kl.podManager.TranslatePodUID(podUID)
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
}
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
}
return kl.runner.PortForward(&pod, port, stream)
}
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
func (kl *Kubelet) GetConfiguration() componentconfig.KubeletConfiguration {
return kl.kubeletConfiguration
}
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
kl.recorder.Eventf(kl.nodeRef, api.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
}
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
}
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
func (kl *Kubelet) cleanUpContainersInPod(podId types.UID, exitedContainerID string) {
if podStatus, err := kl.podCache.Get(podId); err == nil {
removeAll := false
if syncedPod, ok := kl.podManager.GetPodByUID(podId); ok {
// When an evicted pod has already synced, all containers can be removed.
removeAll = eviction.PodIsEvicted(syncedPod.Status)
}
kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
}
}
// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContatnerRemoved doesn't affect pod state
return event.Type != pleg.ContainerRemoved
}
func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) {
rl := make(api.ResourceList)
for k, v := range m {
switch api.ResourceName(k) {
// Only CPU and memory resources are supported.
case api.ResourceCPU, api.ResourceMemory:
q, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
if q.Sign() == -1 {
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
}
rl[api.ResourceName(k)] = q
default:
return nil, fmt.Errorf("cannot reserve %q resource", k)
}
}
return rl, nil
}
func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
reservation := new(kubetypes.Reservation)
if rl, err := parseResourceList(kubeReserved); err != nil {
return nil, err
} else {
reservation.Kubernetes = rl
}
if rl, err := parseResourceList(systemReserved); err != nil {
return nil, err
} else {
reservation.System = rl
}
return reservation, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.4.6

搜索帮助