1 Star 0 Fork 0

李童/training-operator

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
service_control.go 7.66 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2019 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package control
import (
"context"
"fmt"
"sync"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)
const (
// FailedCreateServiceReason is added in an event and in a job controller condition
// when a service for a job is failed to be created.
FailedCreateServiceReason = "FailedCreateService"
// SuccessfulCreateServiceReason is added in an event when a service for a job
// is successfully created.
SuccessfulCreateServiceReason = "SuccessfulCreateService"
// FailedDeleteServiceReason is added in an event and in a job condition
// when a service for a job is failed to be deleted.
FailedDeleteServiceReason = "FailedDeleteService"
// SuccessfulDeleteServiceReason is added in an event when a service for a job
// is successfully deleted.
SuccessfulDeleteServiceReason = "SuccessfulDeleteService"
)
// ServiceControlInterface is an interface that knows how to add or delete Services
// created as an interface to allow testing.
type ServiceControlInterface interface {
// CreateServices creates new Services according to the spec.
CreateServices(namespace string, service *v1.Service, object runtime.Object) error
// CreateServicesWithControllerRef creates new services according to the spec, and sets object as the service's controller.
CreateServicesWithControllerRef(namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error
// PatchService patches the service.
PatchService(namespace, name string, data []byte) error
// DeleteService deletes the service identified by serviceID.
DeleteService(namespace, serviceID string, object runtime.Object) error
}
// RealServiceControl is the default implementation of ServiceControlInterface.
type RealServiceControl struct {
KubeClient clientset.Interface
Recorder record.EventRecorder
}
func (r RealServiceControl) PatchService(namespace, name string, data []byte) error {
_, err := r.KubeClient.CoreV1().Services(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err
}
func (r RealServiceControl) CreateServices(namespace string, service *v1.Service, object runtime.Object) error {
return r.createServices(namespace, service, object, nil)
}
func (r RealServiceControl) CreateServicesWithControllerRef(namespace string, service *v1.Service, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := ValidateControllerRef(controllerRef); err != nil {
return err
}
return r.createServices(namespace, service, controllerObject, controllerRef)
}
func (r RealServiceControl) createServices(namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error {
if labels.Set(service.Labels).AsSelectorPreValidated().Empty() {
return fmt.Errorf("unable to create Services, no labels")
}
serviceWithOwner, err := GetServiceFromTemplate(service, object, controllerRef)
if err != nil {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreateServiceReason, "Error creating: %v", err)
return fmt.Errorf("unable to create services: %v", err)
}
newService, err := r.KubeClient.CoreV1().Services(namespace).Create(context.TODO(), serviceWithOwner, metav1.CreateOptions{})
if err != nil {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreateServiceReason, "Error creating: %v", err)
return fmt.Errorf("unable to create services: %v", err)
}
accessor, err := meta.Accessor(object)
if err != nil {
log.Errorf("parentObject does not have ObjectMeta, %v", err)
return nil
}
log.Infof("Controller %v created service %v", accessor.GetName(), newService.Name)
r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreateServiceReason, "Created service: %v", newService.Name)
return nil
}
// DeleteService deletes the service identified by serviceID.
func (r RealServiceControl) DeleteService(namespace, serviceID string, object runtime.Object) error {
accessor, err := meta.Accessor(object)
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
service, err := r.KubeClient.CoreV1().Services(namespace).Get(context.TODO(), serviceID, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if service.DeletionTimestamp != nil {
log.Infof("service %s/%s is terminating, skip deleting", service.Namespace, service.Name)
return nil
}
log.Infof("Controller %v deleting service %v/%v", accessor.GetName(), namespace, serviceID)
if err := r.KubeClient.CoreV1().Services(namespace).Delete(context.TODO(), serviceID, metav1.DeleteOptions{}); err != nil {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeleteServiceReason, "Error deleting: %v", err)
return fmt.Errorf("unable to delete service: %v", err)
} else {
r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeleteServiceReason, "Deleted service: %v", serviceID)
}
return nil
}
type FakeServiceControl struct {
sync.Mutex
Templates []v1.Service
ControllerRefs []metav1.OwnerReference
DeleteServiceName []string
Patches [][]byte
Err error
CreateLimit int
CreateCallCount int
}
var _ ServiceControlInterface = &FakeServiceControl{}
func (f *FakeServiceControl) PatchService(namespace, name string, data []byte) error {
f.Lock()
defer f.Unlock()
f.Patches = append(f.Patches, data)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakeServiceControl) CreateServices(namespace string, service *v1.Service, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
return fmt.Errorf("not creating service, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
}
f.Templates = append(f.Templates, *service)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakeServiceControl) CreateServicesWithControllerRef(namespace string, service *v1.Service, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
return fmt.Errorf("not creating service, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
}
f.Templates = append(f.Templates, *service)
f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakeServiceControl) DeleteService(namespace string, serviceID string, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.DeleteServiceName = append(f.DeleteServiceName, serviceID)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakeServiceControl) Clear() {
f.Lock()
defer f.Unlock()
f.DeleteServiceName = []string{}
f.Templates = []v1.Service{}
f.ControllerRefs = []metav1.OwnerReference{}
f.Patches = [][]byte{}
f.CreateLimit = 0
f.CreateCallCount = 0
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/vak80/training-operator.git
git@gitee.com:vak80/training-operator.git
vak80
training-operator
training-operator
v1.7.0-fix

搜索帮助