1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
controller_utils.go 9.88 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"context"
"errors"
"fmt"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/kubelet/util/format"
nodepkg "k8s.io/kubernetes/pkg/util/node"
"github.com/golang/glog"
)
var (
// ErrCloudInstance occurs when the cloud provider does not support
// the Instances API.
ErrCloudInstance = errors.New("cloud provider doesn't support instances")
)
// DeletePods will delete all pods from master running on given node,
// and return true if any pods were deleted, or were found pending
// deletion.
func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options)
var updateErrList []error
if err != nil {
return remaining, err
}
if len(pods.Items) > 0 {
RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// Set reason and message in the pod object.
if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil {
if apierrors.IsConflict(err) {
updateErrList = append(updateErrList,
fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err))
continue
}
}
// if the pod has already been marked for deletion, we still return true that there are remaining pods.
if pod.DeletionGracePeriodSeconds != nil {
remaining = true
continue
}
// if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod)
if err == nil { // No error means at least one daemonset was found
continue
}
glog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
recorder.Eventf(&pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
if len(updateErrList) > 0 {
return false, utilerrors.NewAggregate(updateErrList)
}
return remaining, nil
}
// SetPodTerminationReason attempts to set a reason and message in the
// pod status, updates it in the apiserver, and returns an error if it
// encounters one.
func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
return pod, nil
}
pod.Status.Reason = nodepkg.NodeUnreachablePodReason
pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
var updatedPod *v1.Pod
var err error
if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
return nil, err
}
return updatedPod, nil
}
// ForcefullyDeleteNode deletes the node immediately. The pods on the
// node are cleaned up by the podGC.
func ForcefullyDeleteNode(kubeClient clientset.Interface, nodeName string) error {
if err := kubeClient.CoreV1().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}
// MarkAllPodsNotReady updates ready status of all pods running on
// given node from master return true if success
func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error {
nodeName := node.Name
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
opts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String()}
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(opts)
if err != nil {
return err
}
errMsg := []string{}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
for i, cond := range pod.Status.Conditions {
if cond.Type == v1.PodReady {
pod.Status.Conditions[i].Status = v1.ConditionFalse
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
_, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(&pod)
if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
errMsg = append(errMsg, fmt.Sprintf("%v", err))
}
break
}
}
}
if len(errMsg) == 0 {
return nil
}
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}
// ExistsInCloudProvider returns true if the node exists in the
// cloud provider.
func ExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeName) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.ExternalID(context.TODO(), nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
return true, nil
}
// RecordNodeEvent records a event related to a node.
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeUID),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}
// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
}
// SwapNodeControllerTaint returns true in case of success and false
// otherwise.
func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
for _, taintToAdd := range taintsToAdd {
now := metav1.Now()
taintToAdd.TimeAdded = &now
}
err := controller.AddOrUpdateTaintOnNode(kubeClient, node.Name, taintsToAdd...)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to taint %+v unresponsive Node %q: %v",
taintsToAdd,
node.Name,
err))
return false
}
glog.V(4).Infof("Added %+v Taint to Node %v", taintsToAdd, node.Name)
err = controller.RemoveTaintOffNode(kubeClient, node.Name, node, taintsToRemove...)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to remove %+v unneeded taint from unresponsive Node %q: %v",
taintsToRemove,
node.Name,
err))
return false
}
glog.V(4).Infof("Made sure that Node %+v has no %v Taint", node.Name, taintsToRemove)
return true
}
// CreateAddNodeHandler creates an add node handler.
func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
node := originalObj.(*v1.Node).DeepCopy()
if err := f(node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err))
}
}
}
// CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
return func(origOldObj, origNewObj interface{}) {
node := origNewObj.(*v1.Node).DeepCopy()
prevNode := origOldObj.(*v1.Node).DeepCopy()
if err := f(prevNode, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
}
}
}
// CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
func CreateDeleteNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {
originalNode, isNode := originalObj.(*v1.Node)
// We can get DeletedFinalStateUnknown instead of *v1.Node here and
// we need to handle that correctly. #34692
if !isNode {
deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Received unexpected object: %v", originalObj)
return
}
originalNode, ok = deletedState.Obj.(*v1.Node)
if !ok {
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
return
}
}
node := originalNode.DeepCopy()
if err := f(node); err != nil {
utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.10.2

搜索帮助