1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
cache.go 18.65 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/klog"
)
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
// linked list. When a NodeInfo is updated, it goes to the head of the list.
// The items closer to the head are the most recently updated items.
type nodeInfoListItem struct {
info *schedulernodeinfo.NodeInfo
next *nodeInfoListItem
prev *nodeInfoListItem
}
type schedulerCache struct {
stop <-chan struct{}
ttl time.Duration
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods map[string]bool
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*nodeInfoListItem
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
nodeTree *NodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
bindingFinished bool
}
type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}
// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulernodeinfo.ImageStateSummary {
return &schedulernodeinfo.ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
return &schedulerCache{
ttl: ttl,
period: period,
stop: stop,
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
imageStates: make(map[string]*imageState),
}
}
// newNodeInfoListItem initializes a new nodeInfoListItem.
func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
return &nodeInfoListItem{
info: ni,
}
}
// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it.
func NewNodeInfoSnapshot() NodeInfoSnapshot {
return NodeInfoSnapshot{
NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
}
}
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
// linked list. The head is the most recently updated NodeInfo.
// We assume cache lock is already acquired.
func (cache *schedulerCache) moveNodeInfoToHead(name string) {
ni, ok := cache.nodes[name]
if !ok {
klog.Errorf("No NodeInfo with name %v found in the cache", name)
return
}
// if the node info list item is already at the head, we are done.
if ni == cache.headNode {
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
if cache.headNode != nil {
cache.headNode.prev = ni
}
ni.next = cache.headNode
ni.prev = nil
cache.headNode = ni
}
// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
// linked list.
// We assume cache lock is already acquired.
func (cache *schedulerCache) removeNodeInfoFromList(name string) {
ni, ok := cache.nodes[name]
if !ok {
klog.Errorf("No NodeInfo with name %v found in the cache", name)
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
// if the removed item was at the head, we must update the head.
if ni == cache.headNode {
cache.headNode = ni.next
}
delete(cache.nodes, name)
}
// Snapshot takes a snapshot of the current scheduler cache. This is used for
// debugging purposes only and shouldn't be confused with UpdateNodeInfoSnapshot
// function.
// This method is expensive, and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes))
for k, v := range cache.nodes {
nodes[k] = v.info.Clone()
}
assumedPods := make(map[string]bool, len(cache.assumedPods))
for k, v := range cache.assumedPods {
assumedPods[k] = v
}
return &Snapshot{
Nodes: nodes,
AssumedPods: assumedPods,
}
}
// UpdateNodeInfoSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
// Get the last generation of the the snapshot.
snapshotGeneration := nodeSnapshot.Generation
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.GetGeneration() <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil {
nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
}
}
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
}
if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
for name := range nodeSnapshot.NodeInfoMap {
if _, ok := cache.nodes[name]; !ok {
delete(nodeSnapshot.NodeInfoMap, name)
}
}
}
return nil
}
func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
alwaysTrue := func(p *v1.Pod) bool { return true }
return cache.FilteredList(alwaysTrue, selector)
}
func (cache *schedulerCache) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
maxSize := 0
for _, n := range cache.nodes {
maxSize += len(n.info.Pods())
}
pods := make([]*v1.Pod, 0, maxSize)
for _, n := range cache.nodes {
for _, pod := range n.info.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
}
}
return pods, nil
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
if ok && cache.assumedPods[key] {
dl := now.Add(cache.ttl)
currState.bindingFinished = true
currState.deadline = &dl
}
return nil
}
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
switch {
// Only assumed pod can be forgotten.
case ok && cache.assumedPods[key]:
err := cache.removePod(pod)
if err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
}
return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
n.info.AddPod(pod)
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
if err := cache.removePod(oldPod); err != nil {
return err
}
cache.addPod(newPod)
return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
return fmt.Errorf("node %v is not found", pod.Spec.NodeName)
}
if err := n.info.RemovePod(pod); err != nil {
return err
}
if len(n.info.Pods()) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil
}
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
cache.removePod(currState.pod)
cache.addPod(pod)
}
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
key, err := schedulernodeinfo.GetPodKey(oldPod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
klog.Errorf("Pod %v updated on a different node than previously added to.", key)
klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
if err := cache.updatePod(oldPod, newPod); err != nil {
return err
}
currState.pod = newPod
default:
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
}
return nil
}
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Delete/Remove event. It needs to have Add event
// before Remove event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
err := cache.removePod(currState.pod)
if err != nil {
return err
}
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
}
return nil
}
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return false, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
b, found := cache.assumedPods[key]
if !found {
return false, nil
}
return b, nil
}
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := schedulernodeinfo.GetPodKey(pod)
if err != nil {
return nil, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
podState, ok := cache.podStates[key]
if !ok {
return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key)
}
return podState.pod, nil
}
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(node.Name)
cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n.info)
return n.info.SetNode(node)
}
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[newNode.Name]
if !ok {
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[newNode.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(newNode.Name)
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n.info)
return n.info.SetNode(newNode)
}
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
if err := n.info.RemoveNode(node); err != nil {
return err
}
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods()) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
cache.nodeTree.RemoveNode(node)
cache.removeNodeImageStates(node)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulernodeinfo.NodeInfo) {
newSum := make(map[string]*schedulernodeinfo.ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
nodeInfo.SetImageStates(newSum)
}
// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}
for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := cache.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(cache.imageStates, name)
}
}
}
}
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.14.1-beta.0

搜索帮助

0d507c66 1850385 C8b1a773 1850385