1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
graph_builder.go 19.16 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
)
type eventType int
func (e eventType) String() string {
switch e {
case addEvent:
return "add"
case updateEvent:
return "update"
case deleteEvent:
return "delete"
default:
return fmt.Sprintf("unknown(%d)", int(e))
}
}
const (
addEvent eventType = iota
updateEvent
deleteEvent
)
type event struct {
eventType eventType
obj interface{}
// the update event comes with an old object, but it's not used by the garbage collector.
oldObj interface{}
gvk schema.GroupVersionKind
}
// GraphBuilder: based on the events supplied by the informers, GraphBuilder updates
// uidToNode, a graph that caches the dependencies as we know, and enqueues
// items to the attemptToDelete and attemptToOrphan.
type GraphBuilder struct {
restMapper meta.RESTMapper
// each monitor list/watches a resource, the results are funneled to the
// dependencyGraphBuilder
monitors []cache.Controller
// metaOnlyClientPool uses a special codec, which removes fields except for
// apiVersion, kind, and metadata during decoding.
metaOnlyClientPool dynamic.ClientPool
// used to register exactly once the rate limiters of the clients used by
// the `monitors`.
registeredRateLimiterForControllers *RegisteredRateLimiter
// monitors are the producer of the graphChanges queue, graphBuilder alters
// the in-memory graph according to the changes.
graphChanges workqueue.RateLimitingInterface
// uidToNode doesn't require a lock to protect, because only the
// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
uidToNode *concurrentUIDToNode
// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
attemptToDelete workqueue.RateLimitingInterface
attemptToOrphan workqueue.RateLimitingInterface
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
// be non-existent are added to the cached.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
stopCh <-chan struct{}
ignoredResources map[schema.GroupResource]struct{}
}
func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
Resource(&apiResource, metav1.NamespaceAll).
List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
Resource(&apiResource, metav1.NamespaceAll).
Watch(options)
},
}
}
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
event := &event{
eventType: addEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{
eventType: updateEvent,
obj: newObj,
oldObj: oldObj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
event := &event{
eventType: deleteEvent,
obj: obj,
gvk: kind,
}
gb.graphChanges.Add(event)
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err == nil {
glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
if gb.stopCh != nil {
// if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this
// means we've re-loaded and re-read discovery and we are adding a new monitor for a
// previously unseen resource, so we need to call Start on the shared informers again (this
// will only start those shared informers that have not yet been started).
go gb.sharedInformers.Start(gb.stopCh)
}
return shared.Informer().GetController(), nil
} else {
glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
}
// TODO: consider store in one storage.
glog.V(5).Infof("create storage for resource %s", resource)
client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind)
if err != nil {
return nil, err
}
gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
_, monitor := cache.NewInformer(
listWatcher(client, resource),
nil,
ResourceResyncTime,
// don't need to clone because it's not from shared cache
handlers,
)
return monitor, nil
}
func (gb *GraphBuilder) monitorsForResources(resources map[schema.GroupVersionResource]struct{}) error {
for resource := range resources {
if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
glog.V(5).Infof("ignore resource %#v", resource)
continue
}
kind, err := gb.restMapper.KindFor(resource)
if err != nil {
nonCoreMsg := fmt.Sprintf(nonCoreMessage, resource)
utilruntime.HandleError(fmt.Errorf("%v. %s", err, nonCoreMsg))
continue
}
monitor, err := gb.controllerFor(resource, kind)
if err != nil {
return err
}
gb.monitors = append(gb.monitors, monitor)
}
return nil
}
func (gb *GraphBuilder) HasSynced() bool {
for _, monitor := range gb.monitors {
if !monitor.HasSynced() {
return false
}
}
return true
}
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
for _, monitor := range gb.monitors {
go monitor.Run(stopCh)
}
go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
// set this so that we can use it if we need to start new shared informers
gb.stopCh = stopCh
}
var ignoredResources = map[schema.GroupResource]struct{}{
{Group: "extensions", Resource: "replicationcontrollers"}: {},
{Group: "", Resource: "bindings"}: {},
{Group: "", Resource: "componentstatuses"}: {},
{Group: "", Resource: "events"}: {},
{Group: "authentication.k8s.io", Resource: "tokenreviews"}: {},
{Group: "authorization.k8s.io", Resource: "subjectaccessreviews"}: {},
{Group: "authorization.k8s.io", Resource: "selfsubjectaccessreviews"}: {},
{Group: "authorization.k8s.io", Resource: "localsubjectaccessreviews"}: {},
{Group: "apiregistration.k8s.io", Resource: "apiservices"}: {},
{Group: "apiextensions.k8s.io", Resource: "customresourcedefinitions"}: {},
}
// DefaultIgnoredResources returns the default set of resources that the garbage collector controller
// should ignore. This is exposed so downstream integrators can have access to the defaults, and add
// to them as necessary when constructing the controller.
func DefaultIgnoredResources() map[schema.GroupResource]struct{} {
return ignoredResources
}
func (gb *GraphBuilder) enqueueChanges(e *event) {
gb.graphChanges.Add(e)
}
// addDependentToOwners adds n to owners' dependents list. If the owner does not
// exist in the gb.uidToNode yet, a "virtual" node will be created to represent
// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
// processItem() will verify if the owner exists according to the API server.
func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
// Create a "virtual" node in the graph for the owner if it doesn't
// exist in the graph yet. Then enqueue the virtual node into the
// attemptToDelete. The garbage processor will enqueue a virtual delete
// event to delete it from the graph if API server confirms this
// owner doesn't exist.
ownerNode = &node{
identity: objectReference{
OwnerReference: owner,
Namespace: n.identity.Namespace,
},
dependents: make(map[*node]struct{}),
}
glog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
gb.uidToNode.Write(ownerNode)
gb.attemptToDelete.Add(ownerNode)
}
ownerNode.addDependent(n)
}
}
// insertNode insert the node to gb.uidToNode; then it finds all owners as listed
// in n.owners, and adds the node to their dependents list.
func (gb *GraphBuilder) insertNode(n *node) {
gb.uidToNode.Write(n)
gb.addDependentToOwners(n, n.owners)
}
// removeDependentFromOwners remove n from owners' dependents list.
func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
continue
}
ownerNode.deleteDependent(n)
}
}
// removeNode removes the node from gb.uidToNode, then finds all
// owners as listed in n.owners, and removes n from their dependents list.
func (gb *GraphBuilder) removeNode(n *node) {
gb.uidToNode.Delete(n.identity.UID)
gb.removeDependentFromOwners(n, n.owners)
}
type ownerRefPair struct {
oldRef metav1.OwnerReference
newRef metav1.OwnerReference
}
// TODO: profile this function to see if a naive N^2 algorithm performs better
// when the number of references is small.
func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
oldUIDToRef := make(map[string]metav1.OwnerReference)
for _, value := range old {
oldUIDToRef[string(value.UID)] = value
}
oldUIDSet := sets.StringKeySet(oldUIDToRef)
newUIDToRef := make(map[string]metav1.OwnerReference)
for _, value := range new {
newUIDToRef[string(value.UID)] = value
}
newUIDSet := sets.StringKeySet(newUIDToRef)
addedUID := newUIDSet.Difference(oldUIDSet)
removedUID := oldUIDSet.Difference(newUIDSet)
intersection := oldUIDSet.Intersection(newUIDSet)
for uid := range addedUID {
added = append(added, newUIDToRef[uid])
}
for uid := range removedUID {
removed = append(removed, oldUIDToRef[uid])
}
for uid := range intersection {
if !reflect.DeepEqual(oldUIDToRef[uid], newUIDToRef[uid]) {
changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[uid], newRef: newUIDToRef[uid]})
}
}
return added, removed, changed
}
// returns if the object in the event just transitions to "being deleted".
func deletionStarts(oldObj interface{}, newAccessor metav1.Object) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so if there is no oldObj, we just return if the newObj (via
// newAccessor) is being deleted.
if oldObj == nil {
if newAccessor.GetDeletionTimestamp() == nil {
return false
}
return true
}
oldAccessor, err := meta.Accessor(oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
return false
}
return beingDeleted(newAccessor) && !beingDeleted(oldAccessor)
}
func beingDeleted(accessor metav1.Object) bool {
return accessor.GetDeletionTimestamp() != nil
}
func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == metav1.FinalizerDeleteDependents {
return true
}
}
return false
}
func hasOrphanFinalizer(accessor metav1.Object) bool {
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == metav1.FinalizerOrphanDependents {
return true
}
}
return false
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStarts(oldObj, newAccessor) && hasDeleteDependentsFinalizer(newAccessor)
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStarts(oldObj, newAccessor) && hasOrphanFinalizer(newAccessor)
}
// if an blocking ownerReference points to an object gets removed, or gets set to
// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) {
for _, ref := range removed {
if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
node, found := gb.uidToNode.Read(ref.UID)
if !found {
glog.V(5).Infof("cannot find %s in uidToNode", ref.UID)
continue
}
gb.attemptToDelete.Add(node)
}
}
for _, c := range changed {
wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
if wasBlocked && isUnblocked {
node, found := gb.uidToNode.Read(c.newRef.UID)
if !found {
glog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID)
continue
}
gb.attemptToDelete.Add(node)
}
}
}
func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
glog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
gb.attemptToOrphan.Add(n)
return
}
if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
glog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
n.markDeletingDependents()
for dep := range n.dependents {
gb.attemptToDelete.Add(dep)
}
gb.attemptToDelete.Add(n)
}
}
func (gb *GraphBuilder) runProcessGraphChanges() {
for gb.processGraphChanges() {
}
}
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
func (gb *GraphBuilder) processGraphChanges() bool {
item, quit := gb.graphChanges.Get()
if quit {
return false
}
defer gb.graphChanges.Done(item)
event, ok := item.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
return true
}
obj := event.obj
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, uid %s, event type %v", event.gvk.GroupVersion().String(), event.gvk.Kind, accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
// Check if the node already exsits
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
identity: objectReference{
OwnerReference: metav1.OwnerReference{
APIVersion: event.gvk.GroupVersion().String(),
Kind: event.gvk.Kind,
UID: accessor.GetUID(),
Name: accessor.GetName(),
},
Namespace: accessor.GetNamespace(),
},
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
beingDeleted: beingDeleted(accessor),
}
gb.insertNode(newNode)
// the underlying delta_fifo may combine a creation and a deletion into
// one event, so we need to further process the event.
gb.processTransitions(event.oldObj, accessor, newNode)
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// handle changes in ownerReferences
added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
// check if the changed dependency graph unblock owners that are
// waiting for the deletion of their dependents.
gb.addUnblockedOwnersToDeleteQueue(removed, changed)
// update the node itself
existingNode.owners = accessor.GetOwnerReferences()
// Add the node to its new owners' dependent lists.
gb.addDependentToOwners(existingNode, added)
// remove the node from the dependent list of node that are no longer in
// the node's owners list.
gb.removeDependentFromOwners(existingNode, removed)
}
if beingDeleted(accessor) {
existingNode.markBeingDeleted()
}
gb.processTransitions(event.oldObj, accessor, existingNode)
case event.eventType == deleteEvent:
if !found {
glog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
return true
}
// removeNode updates the graph
gb.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
if len(existingNode.dependents) > 0 {
gb.absentOwnerCache.Add(accessor.GetUID())
}
for dep := range existingNode.dependents {
gb.attemptToDelete.Add(dep)
}
for _, owner := range existingNode.owners {
ownerNode, found := gb.uidToNode.Read(owner.UID)
if !found || !ownerNode.isDeletingDependents() {
continue
}
// this is to let attempToDeleteItem check if all the owner's
// dependents are deleted, if so, the owner will be deleted.
gb.attemptToDelete.Add(ownerNode)
}
}
return true
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.7.8

搜索帮助