1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
etcd.go 21.91 KB
一键复制 编辑 原始数据 按行查看 历史
Chao Xu 提交于 2016-03-02 23:31 . ignore NotFound error in etcd
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"fmt"
"reflect"
"sync"
"k8s.io/kubernetes/pkg/api"
kubeerr "k8s.io/kubernetes/pkg/api/errors"
etcderr "k8s.io/kubernetes/pkg/api/errors/etcd"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Etcd implements generic.Registry, backing it with etcd storage.
// It's intended to be embeddable, so that you can implement any
// non-generic functions if needed.
// You must supply a value for every field below before use; these are
// left public as it's meant to be overridable if need be.
// This object is intended to be copyable so that it can be used in
// different ways but share the same underlying behavior.
//
// The intended use of this type is embedding within a Kind specific
// RESTStorage implementation. This type provides CRUD semantics on
// a Kubelike resource, handling details like conflict detection with
// ResourceVersion and semantics. The RESTCreateStrategy and
// RESTUpdateStrategy are generic across all backends, and encapsulate
// logic specific to the API.
//
// TODO: make the default exposed methods exactly match a generic RESTStorage
// TODO: because all aspects of etcd have been removed it should really
// just be called a registry implementation.
type Etcd struct {
// Called to make a new object, should return e.g., &api.Pod{}
NewFunc func() runtime.Object
// Called to make a new listing object, should return e.g., &api.PodList{}
NewListFunc func() runtime.Object
// Used for error reporting
QualifiedResource unversioned.GroupResource
// Used for listing/watching; should not include trailing "/"
KeyRootFunc func(ctx api.Context) string
// Called for Create/Update/Get/Delete. Note that 'namespace' can be
// gotten from ctx.
KeyFunc func(ctx api.Context, name string) (string, error)
// Called to get the name of an object
ObjectNameFunc func(obj runtime.Object) (string, error)
// Return the TTL objects should be persisted with. Update is true if this
// is an operation against an existing object. Existing is the current TTL
// or the default for this operation.
TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error)
// Returns a matcher corresponding to the provided labels and fields.
PredicateFunc func(label labels.Selector, field fields.Selector) generic.Matcher
// DeleteCollectionWorkers is the maximum number of workers in a single
// DeleteCollection call.
DeleteCollectionWorkers int
// Called on all objects returned from the underlying store, after
// the exit hooks are invoked. Decorators are intended for integrations
// that are above etcd and should only be used for specific cases where
// storage of the value in etcd is not appropriate, since they cannot
// be watched.
Decorator rest.ObjectFunc
// Allows extended behavior during creation, required
CreateStrategy rest.RESTCreateStrategy
// On create of an object, attempt to run a further operation.
AfterCreate rest.ObjectFunc
// Allows extended behavior during updates, required
UpdateStrategy rest.RESTUpdateStrategy
// On update of an object, attempt to run a further operation.
AfterUpdate rest.ObjectFunc
// Allows extended behavior during updates, optional
DeleteStrategy rest.RESTDeleteStrategy
// On deletion of an object, attempt to run a further operation.
AfterDelete rest.ObjectFunc
// If true, return the object that was deleted. Otherwise, return a generic
// success status response.
ReturnDeletedObject bool
// Allows extended behavior during export, optional
ExportStrategy rest.RESTExportStrategy
// Used for all etcd access functions
Storage storage.Interface
}
// NamespaceKeyRootFunc is the default function for constructing etcd paths to resource directories enforcing namespace rules.
func NamespaceKeyRootFunc(ctx api.Context, prefix string) string {
key := prefix
ns, ok := api.NamespaceFrom(ctx)
if ok && len(ns) > 0 {
key = key + "/" + ns
}
return key
}
// NamespaceKeyFunc is the default function for constructing etcd paths to a resource relative to prefix enforcing namespace rules.
// If no namespace is on context, it errors.
func NamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, error) {
key := NamespaceKeyRootFunc(ctx, prefix)
ns, ok := api.NamespaceFrom(ctx)
if !ok || len(ns) == 0 {
return "", kubeerr.NewBadRequest("Namespace parameter required.")
}
if len(name) == 0 {
return "", kubeerr.NewBadRequest("Name parameter required.")
}
if ok, msg := validation.IsValidPathSegmentName(name); !ok {
return "", kubeerr.NewBadRequest(fmt.Sprintf("Name parameter invalid: %v.", msg))
}
key = key + "/" + name
return key, nil
}
// NoNamespaceKeyFunc is the default function for constructing etcd paths to a resource relative to prefix without a namespace
func NoNamespaceKeyFunc(ctx api.Context, prefix string, name string) (string, error) {
if len(name) == 0 {
return "", kubeerr.NewBadRequest("Name parameter required.")
}
if ok, msg := validation.IsValidPathSegmentName(name); !ok {
return "", kubeerr.NewBadRequest(fmt.Sprintf("Name parameter invalid: %v.", msg))
}
key := prefix + "/" + name
return key, nil
}
// New implements RESTStorage
func (e *Etcd) New() runtime.Object {
return e.NewFunc()
}
// NewList implements RESTLister
func (e *Etcd) NewList() runtime.Object {
return e.NewListFunc()
}
// List returns a list of items matching labels and field
func (e *Etcd) List(ctx api.Context, options *api.ListOptions) (runtime.Object, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
return e.ListPredicate(ctx, e.PredicateFunc(label, field), options)
}
// ListPredicate returns a list of all the items matching m.
func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *api.ListOptions) (runtime.Object, error) {
list := e.NewListFunc()
filterFunc := e.filterAndDecorateFunction(m)
if name, ok := m.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, filterFunc, list)
return list, etcderr.InterpretListError(err, e.QualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
if options == nil {
options = &api.ListOptions{ResourceVersion: "0"}
}
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filterFunc, list)
return list, etcderr.InterpretListError(err, e.QualifiedResource)
}
// Create inserts a new item according to the unique key from the object.
func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, err
}
out := e.NewFunc()
if err := e.Storage.Create(ctx, key, obj, out, ttl); err != nil {
err = etcderr.InterpretCreateError(err, e.QualifiedResource, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
return nil, err
}
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, err
}
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return out, nil
}
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.
func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, false, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, false, err
}
// If AllowUnconditionalUpdate() is true and the object specified by the user does not have a resource version,
// then we populate it with the latest version.
// Else, we check that the version specified by the user matches the version of latest etcd object.
resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
if err != nil {
return nil, false, err
}
doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()
// TODO: expose TTL
creating := false
out := e.NewFunc()
err = e.Storage.GuaranteedUpdate(ctx, key, out, true, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
// Since we return 'obj' from this function and it can be modified outside this
// function, we are resetting resourceVersion to the initial value here.
//
// TODO: In fact, we should probably return a DeepCopy of obj in all places.
err := e.Storage.Versioner().UpdateObject(obj, nil, resourceVersion)
if err != nil {
return nil, nil, err
}
version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
if err != nil {
return nil, nil, err
}
if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, nil, kubeerr.NewNotFound(e.QualifiedResource, name)
}
creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, nil, err
}
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, nil, err
}
return obj, &ttl, nil
}
creating = false
if doUnconditionalUpdate {
// Update the object's resource version to match the latest etcd object's resource version.
err = e.Storage.Versioner().UpdateObject(obj, res.Expiration, res.ResourceVersion)
if err != nil {
return nil, nil, err
}
} else {
// Check if the object's resource version matches the latest resource version.
newVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
if err != nil {
return nil, nil, err
}
if newVersion == 0 {
// TODO: The Invalid error should has a field for Resource.
// After that field is added, we should fill the Resource and
// leave the Kind field empty. See the discussion in #18526.
qualifiedKind := unversioned.GroupKind{e.QualifiedResource.Group, e.QualifiedResource.Resource}
fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), newVersion, "must be specified for an update")}
return nil, nil, kubeerr.NewInvalid(qualifiedKind, name, fieldErrList)
}
if newVersion != version {
return nil, nil, kubeerr.NewConflict(e.QualifiedResource, name, fmt.Errorf("the object has been modified; please apply your changes to the latest version and try again"))
}
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, nil, err
}
ttl, err := e.calculateTTL(obj, res.TTL, true)
if err != nil {
return nil, nil, err
}
if int64(ttl) != res.TTL {
return obj, &ttl, nil
}
return obj, nil, nil
})
if err != nil {
if creating {
err = etcderr.InterpretCreateError(err, e.QualifiedResource, name)
err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj)
} else {
err = etcderr.InterpretUpdateError(err, e.QualifiedResource, name)
}
return nil, false, err
}
if creating {
if e.AfterCreate != nil {
if err := e.AfterCreate(out); err != nil {
return nil, false, err
}
}
} else {
if e.AfterUpdate != nil {
if err := e.AfterUpdate(out); err != nil {
return nil, false, err
}
}
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, false, err
}
}
return out, creating, nil
}
// Get retrieves the item from etcd.
func (e *Etcd) Get(ctx api.Context, name string) (runtime.Object, error) {
obj := e.NewFunc()
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
if err := e.Storage.Get(ctx, key, obj, false); err != nil {
return nil, etcderr.InterpretGetError(err, e.QualifiedResource, name)
}
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}
var (
errAlreadyDeleting = fmt.Errorf("abort delete")
errDeleteNow = fmt.Errorf("delete now")
)
// Delete removes the item from etcd.
func (e *Etcd) Delete(ctx api.Context, name string, options *api.DeleteOptions) (runtime.Object, error) {
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
obj := e.NewFunc()
if err := e.Storage.Get(ctx, key, obj, false); err != nil {
return nil, etcderr.InterpretDeleteError(err, e.QualifiedResource, name)
}
// support older consumers of delete by treating "nil" as delete immediately
if options == nil {
options = api.NewDeleteOptions(0)
}
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, obj, options)
if err != nil {
return nil, err
}
if pendingGraceful {
return e.finalizeDelete(obj, false)
}
var ignoreNotFound bool = false
var lastExisting runtime.Object = nil
if graceful {
out := e.NewFunc()
lastGraceful := int64(0)
err := e.Storage.GuaranteedUpdate(
ctx, key, out, false,
storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
graceful, pendingGraceful, err := rest.BeforeDelete(e.DeleteStrategy, ctx, existing, options)
if err != nil {
return nil, err
}
if pendingGraceful {
return nil, errAlreadyDeleting
}
if !graceful {
return nil, errDeleteNow
}
lastGraceful = *options.GracePeriodSeconds
lastExisting = existing
return existing, nil
}),
)
switch err {
case nil:
if lastGraceful > 0 {
return out, nil
}
// If we are here, the registry supports grace period mechanism and
// we are intentionally delete gracelessly. In this case, we may
// enter a race with other k8s components. If other component wins
// the race, the object will not be found, and we should tolerate
// the NotFound error. See
// https://github.com/kubernetes/kubernetes/issues/19403 for
// details.
ignoreNotFound = true
// exit the switch and delete immediately
case errDeleteNow:
// we've updated the object to have a zero grace period, or it's already at 0, so
// we should fall through and truly delete the object.
case errAlreadyDeleting:
return e.finalizeDelete(obj, true)
default:
return nil, etcderr.InterpretUpdateError(err, e.QualifiedResource, name)
}
}
// delete immediately, or no graceful deletion supported
out := e.NewFunc()
if err := e.Storage.Delete(ctx, key, out); err != nil {
// Please refer to the place where we set ignoreNotFound for the reason
// why we ignore the NotFound error .
if storage.IsNotFound(err) && ignoreNotFound && lastExisting != nil {
// The lastExisting object may not be the last state of the object
// before its deletion, but it's the best approximation.
return e.finalizeDelete(lastExisting, true)
}
return nil, etcderr.InterpretDeleteError(err, e.QualifiedResource, name)
}
return e.finalizeDelete(out, true)
}
// DeleteCollection remove all items returned by List with a given ListOptions from etcd.
//
// DeleteCollection is currently NOT atomic. It can happen that only subset of objects
// will be deleted from etcd, and then an error will be returned.
// In case of success, the list of deleted objects will be returned.
//
// TODO: Currently, there is no easy way to remove 'directory' entry from etcd (if we
// are removing all objects of a given type) with the current API (it's technically
// possibly with etcd API, but watch is not delivered correctly then).
// It will be possible to fix it with v3 etcd API.
func (e *Etcd) DeleteCollection(ctx api.Context, options *api.DeleteOptions, listOptions *api.ListOptions) (runtime.Object, error) {
listObj, err := e.List(ctx, listOptions)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(listObj)
if err != nil {
return nil, err
}
// Spawn a number of goroutines, so that we can issue requests to etcd
// in parallel to speed up deletion.
// TODO: Make this proportional to the number of items to delete, up to
// DeleteCollectionWorkers (it doesn't make much sense to spawn 16
// workers to delete 10 items).
workersNumber := e.DeleteCollectionWorkers
if workersNumber < 1 {
workersNumber = 1
}
wg := sync.WaitGroup{}
toProcess := make(chan int, 2*workersNumber)
errs := make(chan error, workersNumber+1)
go func() {
defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason)
})
for i := 0; i < len(items); i++ {
toProcess <- i
}
close(toProcess)
}()
wg.Add(workersNumber)
for i := 0; i < workersNumber; i++ {
go func() {
// panics don't cross goroutine boundaries
defer utilruntime.HandleCrash(func(panicReason interface{}) {
errs <- fmt.Errorf("DeleteCollection goroutine panicked: %v", panicReason)
})
defer wg.Done()
for {
index, ok := <-toProcess
if !ok {
return
}
accessor, err := meta.Accessor(items[index])
if err != nil {
errs <- err
return
}
if _, err := e.Delete(ctx, accessor.GetName(), options); err != nil && !kubeerr.IsNotFound(err) {
glog.V(4).Infof("Delete %s in DeleteCollection failed: %v", accessor.GetName(), err)
errs <- err
return
}
}
}()
}
wg.Wait()
select {
case err := <-errs:
return nil, err
default:
return listObj, nil
}
}
func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object, error) {
if runHooks && e.AfterDelete != nil {
if err := e.AfterDelete(obj); err != nil {
return nil, err
}
}
if e.ReturnDeletedObject {
if e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
return nil, err
}
}
return obj, nil
}
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
}
// Watch makes a matcher for the given label and field, and calls
// WatchPredicate. If possible, you should customize PredicateFunc to produre a
// matcher that matches by key. generic.SelectionPredicate does this for you
// automatically.
func (e *Etcd) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
resourceVersion := ""
if options != nil {
resourceVersion = options.ResourceVersion
}
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
}
// WatchPredicate starts a watch for the items that m matches.
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
filterFunc := e.filterAndDecorateFunction(m)
if name, ok := m.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
if err != nil {
return nil, err
}
return e.Storage.Watch(ctx, key, resourceVersion, filterFunc)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
}
func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool {
return func(obj runtime.Object) bool {
matches, err := m.Matches(obj)
if err != nil {
glog.Errorf("unable to match watch: %v", err)
return false
}
if matches && e.Decorator != nil {
if err := e.Decorator(obj); err != nil {
glog.Errorf("unable to decorate watch: %v", err)
return false
}
}
return matches
}
}
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error
// if the TTL cannot be calculated. The defaultTTL is changed to 1 if less than zero. Zero means
// no TTL, not expire immediately.
func (e *Etcd) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) {
// etcd may return a negative TTL for a node if the expiration has not occurred due
// to server lag - we will ensure that the value is at least set.
if defaultTTL < 0 {
defaultTTL = 1
}
ttl = uint64(defaultTTL)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, ttl, update)
}
return ttl, err
}
func exportObjectMeta(objMeta *api.ObjectMeta, exact bool) {
objMeta.UID = ""
if !exact {
objMeta.Namespace = ""
}
objMeta.CreationTimestamp = unversioned.Time{}
objMeta.DeletionTimestamp = nil
objMeta.ResourceVersion = ""
objMeta.SelfLink = ""
if len(objMeta.GenerateName) > 0 && !exact {
objMeta.Name = ""
}
}
// Implements the rest.Exporter interface
func (e *Etcd) Export(ctx api.Context, name string, opts unversioned.ExportOptions) (runtime.Object, error) {
obj, err := e.Get(ctx, name)
if err != nil {
return nil, err
}
if meta, err := api.ObjectMetaFor(obj); err == nil {
exportObjectMeta(meta, opts.Exact)
} else {
glog.V(4).Infof("Object of type %v does not have ObjectMeta: %v", reflect.TypeOf(obj), err)
}
if e.ExportStrategy != nil {
if err = e.ExportStrategy.Export(obj, opts.Exact); err != nil {
return nil, err
}
} else {
e.CreateStrategy.PrepareForCreate(obj)
}
return obj, nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.2.8-beta.0

搜索帮助

Dd8185d8 1850385 E526c682 1850385