代码拉取完成,页面将自动刷新
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"net"
"net/http"
"net/url"
"os"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
clientgoclientset "k8s.io/client-go/kubernetes"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/integer"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
certificates "k8s.io/kubernetes/pkg/apis/certificates/v1beta1"
"k8s.io/kubernetes/pkg/apis/componentconfig"
componentconfigv1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
clientcertificates "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/certificates/v1beta1"
v1coregenerated "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker"
dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/gpu"
"k8s.io/kubernetes/pkg/kubelet/gpu/nvidia"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/preemption"
"k8s.io/kubernetes/pkg/kubelet/prober"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/secret"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/security/apparmor"
"k8s.io/kubernetes/pkg/util/bandwidth"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
kubeio "k8s.io/kubernetes/pkg/util/io"
utilipt "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
)
const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 30 * time.Second
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
// Location of container logs.
ContainerLogsDir = "/var/log/containers"
// max backoff period, exported for the e2e test
MaxContainerBackOff = 300 * time.Second
// Capacity of the channel for storing pods to kill. A small number should
// suffice because a goroutine is dedicated to check the channel and does
// not block on anything else.
podKillingChannelCapacity = 50
// Period for performing global cleanup tasks.
housekeepingPeriod = time.Second * 2
// Period for performing eviction monitoring.
// TODO ensure this is in sync with internal cadvisor housekeeping.
evictionMonitoringPeriod = time.Second * 10
// The path in containers' filesystems where the hosts file is mounted.
etcHostsPath = "/etc/hosts"
// Capacity of the channel for receiving pod lifecycle events. This number
// is a bit arbitrary and may be adjusted in the future.
plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events.
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
// backOffPeriod is the period to back off when pod syncing results in an
// error. It is also used as the base period for the exponential backoff
// container restarts and image pulls.
backOffPeriod = time.Second * 10
// Period for performing container garbage collection.
ContainerGCPeriod = time.Minute
// Period for performing image garbage collection.
ImageGCPeriod = 5 * time.Minute
// Minimum number of dead containers to keep in a pod
minDeadContainerInPod = 1
)
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups() error
}
// Option is a functional option type for Kubelet
type Option func(*Kubelet)
// bootstrapping interface for kubelet, targets the initialization protocol
type KubeletBootstrap interface {
GetConfiguration() componentconfig.KubeletConfiguration
BirthCry()
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool)
ListenAndServeReadOnly(address net.IP, port uint)
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
// create and initialize a Kubelet instance
type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, crOptions *options.ContainerRuntimeOptions, standaloneMode bool, hostnameOverride, nodeIP, providerID string) (KubeletBootstrap, error)
// KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
// these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
type KubeletDeps struct {
// TODO(mtaufen): KubeletBuilder:
// Mesos currently uses this as a hook to let them make their own call to
// let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
// their own KubeletBootstrap. It's a useful hook. I need to think about what
// a nice home for it would be. There seems to be a trend, between this and
// the Options fields below, of providing hooks where you can add extra functionality
// to the Kubelet for your solution. Maybe we should centralize these sorts of things?
Builder KubeletBuilder
// TODO(mtaufen): ContainerRuntimeOptions and Options:
// Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
// seem like a difficult path to trace when it's time to debug something.
// I'm leaving these fields here for now, but there is likely an easier-to-follow
// way to support their intended use cases. E.g. ContainerRuntimeOptions
// is used by Mesos to set an environment variable in containers which has
// some connection to their container GC. It seems that Mesos intends to use
// Options to add additional node conditions that are updated as part of the
// Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
// We should think about providing more explicit ways of doing these things.
ContainerRuntimeOptions []kubecontainer.Option
Options []Option
// Injected Dependencies
Auth server.AuthInterface
CAdvisorInterface cadvisor.Interface
Cloud cloudprovider.Interface
ContainerManager cm.ContainerManager
DockerClient libdocker.Interface
EventClient v1core.EventsGetter
HeartbeatClient v1coregenerated.CoreV1Interface
KubeClient clientset.Interface
ExternalKubeClient clientgoclientset.Interface
Mounter mount.Interface
NetworkPlugins []network.NetworkPlugin
OOMAdjuster *oom.OOMAdjuster
OSInterface kubecontainer.OSInterface
PodConfig *config.PodConfig
Recorder record.EventRecorder
Writer kubeio.Writer
VolumePlugins []volume.VolumePlugin
TLSOptions *server.TLSOptions
}
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName types.NodeName) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if kubeCfg.ManifestURLHeader != "" {
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
func getRuntimeAndImageServices(config *componentconfig.KubeletConfiguration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
rs, err := remote.NewRemoteRuntimeService(config.RemoteRuntimeEndpoint, config.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
is, err := remote.NewRemoteImageService(config.RemoteImageEndpoint, config.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, nil, err
}
return rs, is, err
}
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, crOptions *options.ContainerRuntimeOptions, standaloneMode bool, hostnameOverride, nodeIP, providerID string) (*Kubelet, error) {
if kubeCfg.RootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
}
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}
if kubeCfg.MakeIPTablesUtilChains {
if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
}
if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
}
}
hostname := nodeutil.GetHostname(hostnameOverride)
// Query the cloud provider for our node name, default to hostname
nodeName := types.NodeName(hostname)
cloudIPs := []net.IP{}
cloudNames := []string{}
if kubeDeps.Cloud != nil {
var err error
instances, ok := kubeDeps.Cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
nodeName, err = instances.CurrentNodeName(hostname)
if err != nil {
return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
}
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
nodeAddresses, err := instances.NodeAddresses(nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get the addresses of the current instance from the cloud provider: %v", err)
}
for _, nodeAddress := range nodeAddresses {
switch nodeAddress.Type {
case v1.NodeExternalIP, v1.NodeInternalIP:
ip := net.ParseIP(nodeAddress.Address)
if ip != nil && !ip.IsLoopback() {
cloudIPs = append(cloudIPs, ip)
}
case v1.NodeExternalDNS, v1.NodeInternalDNS, v1.NodeHostName:
cloudNames = append(cloudNames, nodeAddress.Address)
}
}
}
}
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: kubeCfg.MinimumGCAge.Duration,
MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
MaxContainers: int(kubeCfg.MaxContainerCount),
}
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
diskSpacePolicy := DiskSpacePolicy{
DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
}
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if kubeCfg.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
enforceNodeAllocatable = []string{}
}
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: kubeCfg.ExperimentalKernelMemcgNotification,
}
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0).Run()
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0).Run()
}
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
// TODO: get the real node object of ourself,
// and use the real node name and UID.
// TODO: what is namespace for node?
nodeRef := &clientv1.ObjectReference{
Kind: "Node",
Name: string(nodeName),
UID: types.UID(nodeName),
Namespace: "",
}
diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
containerRefManager := kubecontainer.NewRefManager()
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := net.ParseIP(ipEntry)
if ip == nil {
glog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}
klet := &Kubelet{
hostname: hostname,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
rootDirectory: kubeCfg.RootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: kubeCfg.RegisterNode,
registerSchedulable: kubeCfg.RegisterSchedulable,
standaloneMode: standaloneMode,
clusterDomain: kubeCfg.ClusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
nodeInfo: nodeInfo,
masterServiceNamespace: kubeCfg.MasterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
diskSpaceManager: diskSpaceManager,
cloud: kubeDeps.Cloud,
autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider),
externalCloudProvider: cloudprovider.IsExternal(kubeCfg.CloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: kubeCfg.NodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
resolverConfig: kubeCfg.ResolverConfig,
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
nodeIP: net.ParseIP(nodeIP),
clock: clock.RealClock{},
outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration,
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
}
secretManager := secret.NewCachingSecretManager(
kubeDeps.KubeClient, secret.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.secretManager = secretManager
configMapManager := configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode))
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
glog.Infof("Experimental host user namespace defaulting is enabled.")
}
hairpinMode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, crOptions.NetworkPluginName)
if err != nil {
// This is a non-recoverable error. Returning it up the callstack will just
// lead to retries of the same failure, so just fail hard.
glog.Fatalf("Invalid hairpin mode: %v", err)
}
glog.Infof("Hairpin mode set to %q", hairpinMode)
// TODO(#36485) Remove this workaround once we fix the init-container issue.
// Touch iptables lock file, which will be shared among all processes accessing
// the iptables.
f, err := os.OpenFile(utilipt.LockfilePath16x, os.O_CREATE, 0600)
if err != nil {
glog.Warningf("Failed to open iptables lock file: %v", err)
} else if err = f.Close(); err != nil {
glog.Warningf("Failed to close iptables lock file: %v", err)
}
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, crOptions.NetworkPluginName, &criNetworkHost{&networkHost{klet}, &network.NoopPortMappingGetter{}}, hairpinMode, kubeCfg.NonMasqueradeCIDR, int(crOptions.NetworkPluginMTU)); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
machineInfo, err := klet.GetCachedMachineInfo()
if err != nil {
return nil, err
}
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
if kubeCfg.RemoteRuntimeEndpoint != "" {
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
if kubeCfg.RemoteImageEndpoint == "" {
kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
}
}
// TODO: These need to become arguments to a standalone docker shim.
binDir := crOptions.CNIBinDir
if binDir == "" {
binDir = crOptions.NetworkPluginDir
}
pluginSettings := dockershim.NetworkPluginSettings{
HairpinMode: hairpinMode,
NonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
PluginName: crOptions.NetworkPluginName,
PluginConfDir: crOptions.CNIConfDir,
PluginBinDir: binDir,
MTU: int(crOptions.NetworkPluginMTU),
}
// Remote runtime shim just cannot talk back to kubelet, so it doesn't
// support bandwidth shaping or hostports till #35457. To enable legacy
// features, replace with networkHost.
var nl *NoOpLegacyHost
pluginSettings.LegacyRuntimeHost = nl
// rktnetes cannot be run with CRI.
if kubeCfg.ContainerRuntime != "rkt" {
// kubelet defers to the runtime shim to setup networking. Setting
// this to nil will prevent it from trying to invoke the plugin.
// It's easier to always probe and initialize plugins till cri
// becomes the default.
klet.networkPlugin = nil
switch kubeCfg.ContainerRuntime {
case "docker":
// Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClient, kubeCfg.SeccompProfileRoot, crOptions.PodSandboxImage,
streamingConfig, &pluginSettings, kubeCfg.RuntimeCgroups, kubeCfg.CgroupDriver, crOptions.DockerExecHandlerName,
crOptions.DockershimRootDirectory, crOptions.DockerDisableSharedPID)
if err != nil {
return nil, err
}
if err := ds.Start(); err != nil {
return nil, err
}
// For now, the CRI shim redirects the streaming requests to the
// kubelet, which handles the requests using DockerService..
klet.criHandler = ds
// The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
kubeCfg.RemoteRuntimeEndpoint,
kubeCfg.RemoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(kubeCfg.RemoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
// Create dockerLegacyService when the logging driver is not supported.
supported, err := dockershim.IsCRISupportedLogDriver(kubeDeps.DockerClient)
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = dockershim.NewDockerLegacyService(kubeDeps.DockerClient)
}
case "remote":
// No-op.
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", kubeCfg.ContainerRuntime)
}
runtimeService, imageService, err := getRuntimeAndImageServices(kubeCfg)
if err != nil {
return nil, err
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
machineInfo,
klet.podManager,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.CPUCFSQuota,
runtimeService,
imageService,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.runner = runtime
} else {
// rkt uses the legacy, non-CRI, integration. Configure it the old way.
// TODO: Include hairpin mode settings in rkt?
conf := &rkt.Config{
Path: crOptions.RktPath,
Stage1Image: crOptions.RktStage1Image,
InsecureOptions: "image,ondisk",
}
runtime, err := rkt.New(
crOptions.RktAPIEndpoint,
conf,
klet,
kubeDeps.Recorder,
containerRefManager,
klet.podManager,
klet.livenessManager,
httpClient,
klet.networkPlugin,
hairpinMode == componentconfig.HairpinVeth,
utilexec.New(),
kubecontainer.RealOS{},
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
kubeCfg.RuntimeRequestTimeout.Duration,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.runner = kubecontainer.DirectStreamingRunner(runtime)
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
klet.updatePodCIDR(kubeCfg.PodCIDR)
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kubeDeps.TLSOptions != nil {
var ips []net.IP
cfgAddress := net.ParseIP(kubeCfg.Address)
if cfgAddress == nil || cfgAddress.IsUnspecified() {
if localIPs, err := allLocalIPsWithoutLoopback(); err != nil {
return nil, err
} else {
ips = localIPs
}
} else {
ips = []net.IP{cfgAddress}
}
ips = append(ips, cloudIPs...)
names := append([]string{klet.GetHostname(), hostnameOverride}, cloudNames...)
klet.serverCertificateManager, err = initializeServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, ips, names)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
cert := klet.serverCertificateManager.Current()
if cert == nil {
return nil, fmt.Errorf("no certificate available")
}
return cert, nil
}
}
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, kubeDeps.VolumePlugins)
if err != nil {
return nil, err
}
// If the experimentalMounterPathFlag is set, we do not want to
// check node capabilities since the mount path is not the default
if len(kubeCfg.ExperimentalMounterPath) != 0 {
kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount = false
}
// setup volumeManager
klet.volumeManager, err = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.statusManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
klet.getPodsDir(),
kubeDeps.Recorder,
kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeCfg.KeepTerminatedPodVolumes)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
// add sysctl admission
runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
if err != nil {
return nil, err
}
safeWhitelist, err := sysctl.NewWhitelist(sysctl.SafeSysctlWhitelist(), v1.SysctlsPodAnnotationKey)
if err != nil {
return nil, err
}
// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
// Hence, we concatenate those two lists.
safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, v1.UnsafeSysctlsPodAnnotationKey)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
klet.admitHandlers.AddPodAdmitHandler(safeWhitelist)
klet.admitHandlers.AddPodAdmitHandler(unsafeWhitelist)
// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler))
// apply functional Option's
for _, opt := range kubeDeps.Options {
opt(klet)
}
klet.appArmorValidator = apparmor.NewValidator(kubeCfg.ContainerRuntime)
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
if utilfeature.DefaultFeatureGate.Enabled(features.Accelerators) {
if kubeCfg.ContainerRuntime == "docker" {
if klet.gpuManager, err = nvidia.NewNvidiaGPUManager(klet, kubeDeps.DockerClient); err != nil {
return nil, err
}
} else {
glog.Errorf("Accelerators feature is supported with docker runtime only. Disabling this feature internally.")
}
}
// Set GPU manager to a stub implementation if it is not enabled or cannot be supported.
if klet.gpuManager == nil {
klet.gpuManager = gpu.NewGPUManagerStub()
}
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
return klet, nil
}
type serviceLister interface {
List(labels.Selector) ([]*v1.Service, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
kubeletConfiguration componentconfig.KubeletConfiguration
hostname string
nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
heartbeatClient v1coregenerated.CoreV1Interface
iptClient utilipt.Interface
rootDirectory string
// podWorkers handle syncing Pods in response to events.
podWorkers PodWorkers
// resyncInterval is the interval between periodic full reconciliations of
// pods on this node.
resyncInterval time.Duration
// sourcesReady records the sources seen by the kubelet, it is thread-safe.
sourcesReady config.SourcesReady
// podManager is a facade that abstracts away the various sources of pods
// this Kubelet services.
podManager kubepod.Manager
// Needed to observe and respond to situations that could impact node stability
evictionManager eviction.Manager
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
// Optional, defaults to simple Docker implementation
runner kubecontainer.ContainerCommandRunner
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// Set to true to have the node register itself as schedulable.
registerSchedulable bool
// for internal book keeping; access only from within registerWithApiserver
registrationCompleted bool
// Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
standaloneMode bool
// If non-empty, use this for container DNS search.
clusterDomain string
// If non-nil, use this for container DNS server.
clusterDNS []net.IP
// masterServiceNamespace is the namespace that the master service is exposed in.
masterServiceNamespace string
// serviceLister knows how to list services
serviceLister serviceLister
// nodeInfo knows how to get information about the node for this kubelet.
nodeInfo predicates.NodeInfo
// a list of node labels to register
nodeLabels map[string]string
// Last timestamp when runtime responded on ping.
// Mutex is used to protect this value.
runtimeState *runtimeState
// Volume plugins.
volumePluginMgr *volume.VolumePluginMgr
// Network plugin.
networkPlugin network.NetworkPlugin
// Handles container probing.
probeManager prober.Manager
// Manages container health check results.
livenessManager proberesults.Manager
// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
streamingConnectionIdleTimeout time.Duration
// The EventRecorder to use
recorder record.EventRecorder
// Policy for handling garbage collection of dead containers.
containerGC kubecontainer.ContainerGC
// Manager for image garbage collection.
imageManager images.ImageGCManager
// Diskspace manager.
diskSpaceManager diskSpaceManager
// Secret manager.
secretManager secret.Manager
// ConfigMap manager.
configMapManager configmap.Manager
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorapi.MachineInfo
//Cached RootFsInfo returned by cadvisor
rootfsInfo *cadvisorapiv2.FsInfo
// Handles certificate rotations.
serverCertificateManager certificate.Manager
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager status.Manager
// VolumeManager runs a set of asynchronous loops that figure out which
// volumes need to be attached/mounted/unmounted/detached based on the pods
// scheduled on this node and makes it so.
volumeManager volumemanager.VolumeManager
// Cloud provider interface.
cloud cloudprovider.Interface
autoDetectCloudProvider bool
// Indicates that the node initialization happens in an external cloud controller
externalCloudProvider bool
// Reference to this node.
nodeRef *clientv1.ObjectReference
// Container runtime.
containerRuntime kubecontainer.Runtime
// reasonCache caches the failure reason of the last creation of all containers, which is
// used for generating ContainerStatus.
reasonCache *ReasonCache
// nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// status. Kubelet may fail to update node status reliably if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator
// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
// os is a facade for various syscalls that need to be mocked during testing.
os kubecontainer.OSInterface
// Watcher of out of memory events.
oomWatcher OOMWatcher
// Monitor resource usage
resourceAnalyzer stats.ResourceAnalyzer
// Whether or not we should have the QOS cgroup hierarchy for resource management
cgroupsPerQOS bool
// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string
// Mounter to use for volumes.
mounter mount.Interface
// Writer interface to use for volumes.
writer kubeio.Writer
// Manager of non-Runtime containers.
containerManager cm.ContainerManager
// Maximum Number of Pods which can be run by this Kubelet
maxPods int
// Monitor Kubelet's sync loop
syncLoopMonitor atomic.Value
// Container restart Backoff
backOff *flowcontrol.Backoff
// Channel for sending pods to kill.
podKillingCh chan *kubecontainer.PodPair
// The configuration file used as the base to generate the container's
// DNS resolver configuration file. This can be used in conjunction with
// clusterDomain and clusterDNS.
resolverConfig string
// Optionally shape the bandwidth of a pod
// TODO: remove when kubenet plugin is ready
shaper bandwidth.BandwidthShaper
// Information about the ports which are opened by daemons on Node running this Kubelet server.
daemonEndpoints *v1.NodeDaemonEndpoints
// A queue used to trigger pod workers.
workQueue queue.WorkQueue
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
// If non-nil, use this IP address for the node
nodeIP net.IP
// If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
providerID string
// clock is an interface that provides time related functionality in a way that makes it
// easy to test the code.
clock clock.Clock
// outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually
// not out of disk before it can transition the node condition status from out-of-disk to
// not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
// getting rescheduled onto the node.
outOfDiskTransitionFrequency time.Duration
// handlers called during the tryUpdateNodeStatus cycle
setNodeStatusFuncs []func(*v1.Node) error
// TODO: think about moving this to be centralized in PodWorkers in follow-on.
// the list of handlers to call during pod admission.
admitHandlers lifecycle.PodAdmitHandlers
// softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
// run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
// rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
// admission rule should be applied by a softAdmitHandler.
softAdmitHandlers lifecycle.PodAdmitHandlers
// the list of handlers to call during pod sync loop.
lifecycle.PodSyncLoopHandlers
// the list of handlers to call during pod sync.
lifecycle.PodSyncHandlers
// the number of allowed pods per core
podsPerCore int
// enableControllerAttachDetach indicates the Attach/Detach controller
// should manage attachment/detachment of volumes scheduled to this node,
// and disable kubelet from executing any attach/detach operations
enableControllerAttachDetach bool
// trigger deleting containers in a pod
containerDeletor *podContainerDeletor
// config iptables util rules
makeIPTablesUtilChains bool
// The bit of the fwmark space to mark packets for SNAT.
iptablesMasqueradeBit int
// The bit of the fwmark space to mark packets for dropping.
iptablesDropBit int
// The AppArmor validator for checking whether AppArmor is supported.
appArmorValidator apparmor.Validator
// The handler serving CRI streaming calls (exec/attach/port-forward).
criHandler http.Handler
// experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
// are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
// or using host path volumes.
// This should only be enabled when the container runtime is performing user remapping AND if the
// experimental behavior is desired.
experimentalHostUserNamespaceDefaulting bool
// GPU Manager
gpuManager gpu.GPUManager
// dockerLegacyService contains some legacy methods for backward compatibility.
// It should be set only when docker is using non json-file logging driver.
dockerLegacyService dockershim.DockerLegacyService
}
func initializeServerCertificateManager(kubeClient clientset.Interface, kubeCfg *componentconfig.KubeletConfiguration, nodeName types.NodeName, ips []net.IP, hostnames []string) (certificate.Manager, error) {
var certSigningRequestClient clientcertificates.CertificateSigningRequestInterface
if kubeClient != nil && kubeClient.Certificates() != nil {
certSigningRequestClient = kubeClient.Certificates().CertificateSigningRequests()
}
certificateStore, err := certificate.NewFileStore(
"kubelet-server",
kubeCfg.CertDirectory,
kubeCfg.CertDirectory,
kubeCfg.TLSCertFile,
kubeCfg.TLSPrivateKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate store: %v", err)
}
return certificate.NewManager(&certificate.Config{
CertificateSigningRequestClient: certSigningRequestClient,
Template: &x509.CertificateRequest{
Subject: pkix.Name{
CommonName: fmt.Sprintf("system:node:%s", nodeName),
Organization: []string{"system:nodes"},
},
DNSNames: hostnames,
IPAddresses: ips,
},
Usages: []certificates.KeyUsage{
// https://tools.ietf.org/html/rfc5280#section-4.2.1.3
//
// Digital signature allows the certificate to be used to verify
// digital signatures used during TLS negotiation.
certificates.UsageDigitalSignature,
// KeyEncipherment allows the cert/key pair to be used to encrypt
// keys, including the symetric keys negotiated during TLS setup
// and used for data transfer.
certificates.UsageKeyEncipherment,
// ServerAuth allows the cert to be used by a TLS server to
// authenticate itself to a TLS client.
certificates.UsageServerAuth,
},
CertificateStore: certificateStore,
})
}
func allLocalIPsWithoutLoopback() ([]net.IP, error) {
interfaces, err := net.Interfaces()
if err != nil {
return nil, fmt.Errorf("could not list network interfaces: %v", err)
}
var ips []net.IP
for _, i := range interfaces {
addresses, err := i.Addrs()
if err != nil {
return nil, fmt.Errorf("could not list the addresses for network interface %v: %v\n", i, err)
}
for _, address := range addresses {
switch v := address.(type) {
case *net.IPNet:
if !v.IP.IsLoopback() {
ips = append(ips, v.IP)
}
}
}
}
return ips, nil
}
// setupDataDirs creates:
// 1. the root directory
// 2. the pods directory
// 3. the plugins directory
func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = path.Clean(kl.rootDirectory)
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
return fmt.Errorf("error creating root directory: %v", err)
}
if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
return fmt.Errorf("error creating pods directory: %v", err)
}
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins directory: %v", err)
}
return nil
}
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
var vLevel glog.Level = 4
if loggedContainerGCFailure {
vLevel = 1
loggedContainerGCFailure = false
}
glog.V(vLevel).Infof("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
prevImageGCFailed := false
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
if prevImageGCFailed {
glog.Errorf("Image garbage collection failed multiple times in a row: %v", err)
// Only create an event for repeated failures
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
} else {
glog.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
}
prevImageGCFailed = true
} else {
var vLevel glog.Level = 4
if prevImageGCFailed {
vLevel = 1
prevImageGCFailed = false
}
glog.V(vLevel).Infof("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
}
// initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules() error {
// Prometheus metrics.
metrics.Register(kl.runtimeCache)
// Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
}
}
// Start the image manager.
kl.imageManager.Start()
// Start the certificate manager.
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
kl.serverCertificateManager.Start()
}
// Start container manager.
node, err := kl.getNodeAnyWay()
if err != nil {
return fmt.Errorf("Kubelet failed to get node info: %v", err)
}
if err := kl.containerManager.Start(node, kl.GetActivePods); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}
// Start out of memory watcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("Failed to start OOM watcher %v", err)
}
// Initialize GPUs
if err := kl.gpuManager.Start(); err != nil {
glog.Errorf("Failed to start gpuManager %v", err)
}
// Start resource analyzer
kl.resourceAnalyzer.Start()
return nil
}
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
// TODO(random-liu): Add backoff logic in the babysitter
glog.Fatalf("Failed to start cAdvisor %v", err)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl.containerManager, evictionMonitoringPeriod)
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.Fatal(err)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start gorouting responsible for checking limits in resolv.conf
if kl.resolverConfig != "" {
go wait.Until(func() { kl.checkLimitsForResolvConf() }, 30*time.Second, wait.NeverStop)
}
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
// GetKubeClient returns the Kubernetes client.
// TODO: This is currently only required by network plugins. Replace
// with more specific methods.
func (kl *Kubelet) GetKubeClient() clientset.Interface {
return kl.kubeClient
}
// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
// domains of the cluster.
func (kl *Kubelet) GetClusterDNS(pod *v1.Pod) ([]string, []string, bool, error) {
var hostDNS, hostSearch []string
// Get host DNS settings
if kl.resolverConfig != "" {
f, err := os.Open(kl.resolverConfig)
if err != nil {
return nil, nil, false, err
}
defer f.Close()
hostDNS, hostSearch, err = kl.parseResolvConf(f)
if err != nil {
return nil, nil, false, err
}
}
useClusterFirstPolicy := ((pod.Spec.DNSPolicy == v1.DNSClusterFirst && !kubecontainer.IsHostNetworkPod(pod)) || pod.Spec.DNSPolicy == v1.DNSClusterFirstWithHostNet)
if useClusterFirstPolicy && len(kl.clusterDNS) == 0 {
// clusterDNS is not known.
// pod with ClusterDNSFirst Policy cannot be created
kl.recorder.Eventf(pod, v1.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, "MissingClusterDNS", log)
// fallback to DNSDefault
useClusterFirstPolicy = false
}
if !useClusterFirstPolicy {
// When the kubelet --resolv-conf flag is set to the empty string, use
// DNS settings that override the docker default (which is to use
// /etc/resolv.conf) and effectively disable DNS lookups. According to
// the bind documentation, the behavior of the DNS client library when
// "nameservers" are not specified is to "use the nameserver on the
// local machine". A nameserver setting of localhost is equivalent to
// this documented behavior.
if kl.resolverConfig == "" {
hostDNS = []string{"127.0.0.1"}
hostSearch = []string{"."}
} else {
hostSearch = kl.formDNSSearchForDNSDefault(hostSearch, pod)
}
return hostDNS, hostSearch, useClusterFirstPolicy, nil
}
// for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
// the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
// in case the cluster DNS server cannot resolve the DNS query itself
dns := make([]string, len(kl.clusterDNS))
for i, ip := range kl.clusterDNS {
dns[i] = ip.String()
}
dnsSearch := kl.formDNSSearch(hostSearch, pod)
return dns, dnsSearch, useClusterFirstPolicy, nil
}
// syncPod is the transaction script for the sync of a single pod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
killPodOptions := o.killPodOptions
if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
return fmt.Errorf("kill pod options are required if update type is kill")
}
apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we kill the pod with the specified grace period since this is a termination
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
return nil
}
// Latency measurements for the main workflow are relative to the
// first time the pod was seen by the API server.
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}
// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
} else {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
}
}
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IP = apiPodStatus.PodIP
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
runnable := kl.canRunPod(pod)
if !runnable.Admit {
// Pod is not runnable; update the Pod and Container statuses to why.
apiPodStatus.Reason = runnable.Reason
apiPodStatus.Message = runnable.Message
// Waiting containers are not creating.
const waitingReason = "Blocked"
for _, cs := range apiPodStatus.InitContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
for _, cs := range apiPodStatus.ContainerStatuses {
if cs.State.Waiting != nil {
cs.State.Waiting.Reason = waitingReason
}
}
}
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Kill pod if it should not be running
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
var syncErr error
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
syncErr = fmt.Errorf("error killing pod: %v", err)
utilruntime.HandleError(syncErr)
} else {
if !runnable.Admit {
// There was no error killing the pod, but the pod cannot be run.
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
}
}
return syncErr
}
// If the network plugin is not ready, only start the pod if it uses the host network
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
return fmt.Errorf("network is not ready: %v", rs)
}
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
if !kl.podIsTerminated(pod) {
// When the kubelet is restarted with the cgroups-per-qos
// flag enabled, all the pod's running containers
// should be killed intermittently and brought back up
// under the qos cgroup hierarchy.
// Check if this is the pod's first sync
firstSync := true
for _, containerStatus := range apiPodStatus.ContainerStatuses {
if containerStatus.State.Running != nil {
firstSync = false
break
}
}
// Don't kill containers in pod if pod's cgroups already
// exists or the pod is running for the first time
podKilled := false
if !pcm.Exists(pod) && !firstSync {
if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
podKilled = true
}
}
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
// the kubelet is restarted with the new flag as run once pods are
// expected to run only once and if the kubelet is restarted then
// they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
}
if err := pcm.EnsureExists(pod); err != nil {
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubepod.IsStaticPod(pod) {
podFullName := kubecontainer.GetPodFullName(pod)
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
} else {
deleted = true
}
}
}
if mirrorPod == nil || deleted {
glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
}
}
}
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
return err
}
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err
}
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
return err
}
// early successful exit if pod is not bandwidth-constrained
if !kl.shapingEnabled() {
return nil
}
// Update the traffic shaping for the pod's ingress and egress limits
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return err
}
if egress != nil || ingress != nil {
if kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Event(pod, v1.EventTypeWarning, events.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network")
} else if kl.shaper != nil {
if len(apiPodStatus.PodIP) > 0 {
err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress)
}
} else {
kl.recorder.Event(pod, v1.EventTypeWarning, events.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined")
}
}
return nil
}
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * internal modules that request sync of a pod.
func (kl *Kubelet) getPodsToSync() []*v1.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
podUIDSet := sets.NewString()
for _, podUID := range podUIDs {
podUIDSet.Insert(string(podUID))
}
var podsToSync []*v1.Pod
for _, pod := range allPods {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
}
// deletePod deletes the pod from the internal state of the kubelet by:
// 1. stopping the associated pod worker asynchronously
// 2. signaling to kill the pod by sending on the podKillingCh channel
//
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(pod *v1.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready, skip deletion, as we may accidentally delete pods
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
kl.podWorkers.ForgetWorker(pod.UID)
// Runtime cache may not have been updated to with the pod, but it's okay
// because the periodic cleanup routine will attempt to delete again later.
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
return fmt.Errorf("error listing containers: %v", err)
}
runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
if runningPod.IsEmpty() {
return fmt.Errorf("pod not found")
}
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
kl.podKillingCh <- &podPair
// TODO: delete the mirror pod here?
// We leave the volume/directory cleanup to the periodic cleanup routine.
return nil
}
// isOutOfDisk detects if pods can't fit due to lack of disk space.
func (kl *Kubelet) isOutOfDisk() bool {
// Check disk space once globally and reject or accept all new pods.
withinBounds, err := kl.diskSpaceManager.IsRuntimeDiskSpaceAvailable()
// Assume enough space in case of errors.
if err != nil {
glog.Errorf("Failed to check if disk space is available for the runtime: %v", err)
} else if !withinBounds {
return true
}
withinBounds, err = kl.diskSpaceManager.IsRootDiskSpaceAvailable()
// Assume enough space in case of errors.
if err != nil {
glog.Errorf("Failed to check if disk space is available on the root partition: %v", err)
} else if !withinBounds {
return true
}
return false
}
// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manage.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(pod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: reason,
Message: "Pod " + message})
}
// canAdmitPod determines if a pod can be admitted, and gives a reason if it
// cannot. "pod" is new pod, while "pods" are all admitted pods
// The function returns a boolean value indicating whether the pod
// can be admitted, a brief single-word reason and a message explaining why
// the pod cannot be admitted.
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
for _, podAdmitHandler := range kl.admitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
// TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and
// add the disk space predicate to predicates.GeneralPredicates.
if kl.isOutOfDisk() {
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to OutOfDisk")
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}
return true, "", ""
}
func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
for _, handler := range kl.softAdmitHandlers {
if result := handler.Admit(attrs); !result.Admit {
return result
}
}
// TODO: Refactor as a soft admit handler.
if err := canRunPod(pod); err != nil {
return lifecycle.PodAdmitResult{
Admit: false,
Reason: "Forbidden",
Message: err.Error(),
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// The resyncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
time.Sleep(5 * time.Second)
continue
}
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. houseKeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated. In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
// Mark the source ready after receiving at least one update from the
// source. Once all the sources are marked ready, various cleanup
// routines will start reclaiming resources. It is important that this
// takes place only after kubelet calls the update handler to process
// the update to ensure the internal pod cache is up-to-date.
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
kl.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a terminated state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// TODO: handle mirror pods in a separate component (issue #17251)
func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
// corresponding static pod. Send update to the pod worker if the static
// pod exists.
if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// TODO: Evaluate if we need to validate and reject updates.
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
}
kl.probeManager.RemovePod(pod)
}
}
// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
for _, pod := range pods {
// Update the pod in pod manager, status manager will do periodically reconcile according
// to the pod manager.
kl.podManager.UpdatePod(pod)
// After an evicted pod is synced, all dead containers in the pod can be removed.
if eviction.PodIsEvicted(pod.Status) {
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
kl.containerDeletor.deleteContainersInPod("", podStatus, true)
}
}
}
}
// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
}
}
// LatestLoopEntryTime returns the last time in the sync loop monitor.
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
val := kl.syncLoopMonitor.Load()
if val == nil {
return time.Time{}
}
return val.(time.Time)
}
// updateRuntimeUp calls the container runtime status callback, initializing
// the runtime dependent modules when the container runtime first comes up,
// and returns an error if the status check fails. If the status check is OK,
// update the container runtime uptime in the kubelet runtimeState.
func (kl *Kubelet) updateRuntimeUp() {
s, err := kl.containerRuntime.Status()
if err != nil {
glog.Errorf("Container runtime sanity check failed: %v", err)
return
}
// rkt uses the legacy, non-CRI integration. Don't check the runtime
// conditions for it.
if kl.kubeletConfiguration.ContainerRuntime != "rkt" {
if s == nil {
glog.Errorf("Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
// TODO(random-liu): Consider to send node event when optional
// condition is unmet.
glog.V(4).Infof("Container runtime status: %v", s)
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
glog.Errorf("Container runtime network not ready: %v", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// TODO(random-liu): Add runtime error in runtimeState, and update it
// when runtime is not ready, so that the information in RuntimeReady
// condition will be propagated to NodeReady condition.
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
glog.Errorf("Container runtime not ready: %v", runtimeReady)
return
}
}
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
// updateCloudProviderFromMachineInfo updates the node's provider ID field
// from the given cadvisor machine info.
func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *v1.Node, info *cadvisorapi.MachineInfo) {
if info.CloudProvider != cadvisorapi.UnknownProvider &&
info.CloudProvider != cadvisorapi.Baremetal {
// The cloud providers from pkg/cloudprovider/providers/* that update ProviderID
// will use the format of cloudprovider://project/availability_zone/instance_name
// here we only have the cloudprovider and the instance name so we leave project
// and availability zone empty for compatibility.
node.Spec.ProviderID = strings.ToLower(string(info.CloudProvider)) +
":////" + string(info.InstanceID)
}
}
// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
func (kl *Kubelet) GetConfiguration() componentconfig.KubeletConfiguration {
return kl.kubeletConfiguration
}
// BirthCry sends an event that the kubelet has started up.
func (kl *Kubelet) BirthCry() {
// Make an event that kubelet restarted.
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
}
// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval
}
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler)
}
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
func (kl *Kubelet) cleanUpContainersInPod(podId types.UID, exitedContainerID string) {
if podStatus, err := kl.podCache.Get(podId); err == nil {
removeAll := false
if syncedPod, ok := kl.podManager.GetPodByUID(podId); ok {
// When an evicted pod has already synced, all containers can be removed.
removeAll = eviction.PodIsEvicted(syncedPod.Status)
}
kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
}
}
// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContatnerRemoved doesn't affect pod state
return event.Type != pleg.ContainerRemoved
}
// Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps) *streaming.Config {
config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
}
return config
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。