代码拉取完成,页面将自动刷新
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
package util
import (
"fmt"
"reflect"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
kubeflowv1 "gitee.com/vak80/training-operator/pkg/apis/kubeflow.org/v1"
"gitee.com/vak80/training-operator/pkg/controller.v1/common"
"gitee.com/vak80/training-operator/pkg/controller.v1/expectation"
commonutil "gitee.com/vak80/training-operator/pkg/util"
)
// SatisfiedExpectations returns true if the required adds/dels for the given mxjob have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func SatisfiedExpectations(exp expectation.ControllerExpectationsInterface, jobKey string, replicaTypes []kubeflowv1.ReplicaType) bool {
satisfied := false
for _, rtype := range replicaTypes {
// Check the expectations of the pods.
expectationPodsKey := expectation.GenExpectationPodsKey(jobKey, string(rtype))
satisfied = satisfied || exp.SatisfiedExpectations(expectationPodsKey)
// Check the expectations of the services.
expectationServicesKey := expectation.GenExpectationServicesKey(jobKey, string(rtype))
satisfied = satisfied || exp.SatisfiedExpectations(expectationServicesKey)
}
return satisfied
}
// OnDependentCreateFunc modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFunc(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
//logrus.Info("Update on create function ", ptjr.ControllerName(), " create object ", e.Object.GetName())
if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
switch e.Object.(type) {
case *corev1.Pod:
expectKey = expectation.GenExpectationPodsKey(jobKey, rtype)
case *corev1.Service:
expectKey = expectation.GenExpectationServicesKey(jobKey, rtype)
default:
return false
}
exp.CreationObserved(expectKey)
return true
}
return true
}
}
// OnDependentUpdateFunc modify expectations when dependent (pod/service) update observed.
func OnDependentUpdateFunc(jc *common.JobController) func(updateEvent event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
newObj := e.ObjectNew
oldObj := e.ObjectOld
if newObj.GetResourceVersion() == oldObj.GetResourceVersion() {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return false
}
kind := jc.Controller.GetAPIGroupVersionKind().Kind
var logger = LoggerForGenericKind(newObj, kind)
switch obj := newObj.(type) {
case *corev1.Pod:
logger = commonutil.LoggerForPod(obj, jc.Controller.GetAPIGroupVersionKind().Kind)
case *corev1.Service:
logger = commonutil.LoggerForService(newObj.(*corev1.Service), jc.Controller.GetAPIGroupVersionKind().Kind)
default:
return false
}
newControllerRef := metav1.GetControllerOf(newObj)
oldControllerRef := metav1.GetControllerOf(oldObj)
controllerRefChanged := !reflect.DeepEqual(newControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := resolveControllerRef(jc, oldObj.GetNamespace(), oldControllerRef); job != nil {
logger.Infof("pod/service controller ref updated: %v, %v", newObj, oldObj)
return true
}
}
// If it has a controller ref, that's all that matters.
if newControllerRef != nil {
job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef)
if job == nil {
return false
}
logger.Debugf("pod/service has a controller ref: %v, %v", newObj, oldObj)
return true
}
return false
}
}
// resolveControllerRef returns the job referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching job
// of the correct Kind.
func resolveControllerRef(jc *common.JobController, namespace string, controllerRef *metav1.OwnerReference) metav1.Object {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != jc.Controller.GetAPIGroupVersionKind().Kind {
return nil
}
job, err := jc.Controller.GetJobFromInformerCache(namespace, controllerRef.Name)
if err != nil {
return nil
}
if job.GetUID() != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}
// OnDependentDeleteFunc modify expectations when dependent (pod/service) deletion observed.
func OnDependentDeleteFunc(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {
rtype := e.Object.GetLabels()[kubeflowv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}
// logrus.Info("Update on deleting function ", xgbr.ControllerName(), " delete object ", e.Object.GetName())
if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
switch e.Object.(type) {
case *corev1.Pod:
expectKey = expectation.GenExpectationPodsKey(jobKey, rtype)
case *corev1.Service:
expectKey = expectation.GenExpectationServicesKey(jobKey, rtype)
default:
return false
}
exp.DeletionObserved(expectKey)
return true
}
return true
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。