1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
actual_state_of_world.go 18.45 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package cache implements data structures used by the attach/detach controller
to keep track of volumes, the nodes they are attached to, and the pods that
reference them.
*/
package cache
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// ActualStateOfWorld defines a set of thread-safe operations supported on
// the attach/detach controller's actual state of the world cache.
// This cache contains volumes->nodes i.e. a set of all volumes and the nodes
// the attach/detach controller believes are successfully attached.
// Note: This is distinct from the ActualStateOfWorld implemented by the kubelet
// volume manager. They both keep track of different objects. This contains
// attach/detach controller specific state.
type ActualStateOfWorld interface {
// ActualStateOfWorld must implement the methods required to allow
// operationexecutor to interact with it.
operationexecutor.ActualStateOfWorldAttacherUpdater
// AddVolumeNode adds the given volume and node to the underlying store
// indicating the specified volume is attached to the specified node.
// A unique volume name is generated from the volumeSpec and returned on
// success.
// If the volume/node combo already exists, the detachRequestedTime is reset
// to zero.
// If volumeSpec is not an attachable volume plugin, an error is returned.
// If no volume with the name volumeName exists in the store, the volume is
// added.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, the node is added.
AddVolumeNode(volumeSpec *volume.Spec, nodeName string, devicePath string) (api.UniqueVolumeName, error)
// SetVolumeMountedByNode sets the MountedByNode value for the given volume
// and node. When set to true this value indicates the volume is mounted by
// the given node, indicating it may not be safe to detach.
// If no volume with the name volumeName exists in the store, an error is
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error
// MarkDesireToDetach returns the difference between the current time and
// the DetachRequestedTime for the given volume/node combo. If the
// DetachRequestedTime is zero, it is set to the current time.
// If no volume with the name volumeName exists in the store, an error is
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
MarkDesireToDetach(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error)
// ResetNodeStatusUpdateNeeded resets statusUpdateNeeded for the specified
// node to false indicating the AttachedVolume field of the Node's Status
// object has been updated.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
ResetNodeStatusUpdateNeeded(nodeName string) error
// DeleteVolumeNode removes the given volume and node from the underlying
// store indicating the specified volume is no longer attached to the
// specified node.
// If the volume/node combo does not exist, this is a no-op.
// If after deleting the node, the specified volume contains no other child
// nodes, the volume is also deleted.
DeleteVolumeNode(volumeName api.UniqueVolumeName, nodeName string)
// VolumeNodeExists returns true if the specified volume/node combo exists
// in the underlying store indicating the specified volume is attached to
// the specified node.
VolumeNodeExists(volumeName api.UniqueVolumeName, nodeName string) bool
// GetAttachedVolumes generates and returns a list of volumes/node pairs
// reflecting which volumes are attached to which nodes based on the
// current actual state of the world.
GetAttachedVolumes() []AttachedVolume
// GetAttachedVolumes generates and returns a list of volumes attached to
// the specified node reflecting which volumes are attached to that node
// based on the current actual state of the world.
GetAttachedVolumesForNode(nodeName string) []AttachedVolume
// GetVolumesToReportAttached returns a map containing the set of nodes for
// which the VolumesAttached Status field in the Node API object should be
// updated. The key in this map is the name of the node to update and the
// value is list of volumes that should be reported as attached (note that
// this may differ from the actual list of attached volumes for the node
// since volumes should be removed from this list as soon a detach operation
// is considered, before the detach operation is triggered).
GetVolumesToReportAttached() map[string][]api.AttachedVolume
}
// AttachedVolume represents a volume that is attached to a node.
type AttachedVolume struct {
operationexecutor.AttachedVolume
// MountedByNode indicates that this volume has been been mounted by the
// node and is unsafe to detach.
// The value is set and unset by SetVolumeMountedByNode(...).
MountedByNode bool
// DetachRequestedTime is used to capture the desire to detach this volume.
// When the volume is newly created this value is set to time zero.
// It is set to current time, when MarkDesireToDetach(...) is called, if it
// was previously set to zero (other wise its value remains the same).
// It is reset to zero on AddVolumeNode(...) calls.
DetachRequestedTime time.Time
}
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld.
func NewActualStateOfWorld(volumePluginMgr *volume.VolumePluginMgr) ActualStateOfWorld {
return &actualStateOfWorld{
attachedVolumes: make(map[api.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[string]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
}
type actualStateOfWorld struct {
// attachedVolumes is a map containing the set of volumes the attach/detach
// controller believes to be successfully attached to the nodes it is
// managing. The key in this map is the name of the volume and the value is
// an object containing more information about the attached volume.
attachedVolumes map[api.UniqueVolumeName]attachedVolume
// nodesToUpdateStatusFor is a map containing the set of nodes for which to
// update the VolumesAttached Status field. The key in this map is the name
// of the node and the value is an object containing more information about
// the node (including the list of volumes to report attached).
nodesToUpdateStatusFor map[string]nodeToUpdateStatusFor
// volumePluginMgr is the volume plugin manager used to create volume
// plugin objects.
volumePluginMgr *volume.VolumePluginMgr
sync.RWMutex
}
// The volume object represents a volume the the attach/detach controller
// believes to be successfully attached to a node it is managing.
type attachedVolume struct {
// volumeName contains the unique identifier for this volume.
volumeName api.UniqueVolumeName
// spec is the volume spec containing the specification for this volume.
// Used to generate the volume plugin object, and passed to attach/detach
// methods.
spec *volume.Spec
// nodesAttachedTo is a map containing the set of nodes this volume has
// successfully been attached to. The key in this map is the name of the
// node and the value is a node object containing more information about
// the node.
nodesAttachedTo map[string]nodeAttachedTo
// devicePath contains the path on the node where the volume is attached
devicePath string
}
// The nodeAttachedTo object represents a node that has volumes attached to it.
type nodeAttachedTo struct {
// nodeName contains the name of this node.
nodeName string
// mountedByNode indicates that this node/volume combo is mounted by the
// node and is unsafe to detach
mountedByNode bool
// number of times SetVolumeMountedByNode has been called to set the value
// of mountedByNode to true. This is used to prevent mountedByNode from
// being reset during the period between attach and mount when volumesInUse
// status for the node may not be set.
mountedByNodeSetCount uint
// detachRequestedTime used to capture the desire to detach this volume
detachRequestedTime time.Time
}
// nodeToUpdateStatusFor is an object that reflects a node that has one or more
// volume attached. It keeps track of the volumes that should be reported as
// attached in the Node's Status API object.
type nodeToUpdateStatusFor struct {
// nodeName contains the name of this node.
nodeName string
// statusUpdateNeeded indicates that the value of the VolumesAttached field
// in the Node's Status API object should be updated. This should be set to
// true whenever a volume is added or deleted from
// volumesToReportAsAttached. It should be reset whenever the status is
// updated.
statusUpdateNeeded bool
// volumesToReportAsAttached is the list of volumes that should be reported
// as attached in the Node's status (note that this may differ from the
// actual list of attached volumes since volumes should be removed from this
// list as soon a detach operation is considered, before the detach
// operation is triggered).
volumesToReportAsAttached map[api.UniqueVolumeName]api.UniqueVolumeName
}
func (asw *actualStateOfWorld) MarkVolumeAsAttached(
_ api.UniqueVolumeName, volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
return err
}
func (asw *actualStateOfWorld) MarkVolumeAsDetached(
volumeName api.UniqueVolumeName, nodeName string) {
asw.DeleteVolumeNode(volumeName, nodeName)
}
func (asw *actualStateOfWorld) AddVolumeNode(
volumeSpec *volume.Spec, nodeName string, devicePath string) (api.UniqueVolumeName, error) {
asw.Lock()
defer asw.Unlock()
attachableVolumePlugin, err := asw.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
return "", fmt.Errorf(
"failed to get AttachablePlugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q err=%v",
volumeSpec.Name(),
err)
}
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
volumeObj = attachedVolume{
volumeName: volumeName,
spec: volumeSpec,
nodesAttachedTo: make(map[string]nodeAttachedTo),
devicePath: devicePath,
}
asw.attachedVolumes[volumeName] = volumeObj
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
// Create object if it doesn't exist.
volumeObj.nodesAttachedTo[nodeName] = nodeAttachedTo{
nodeName: nodeName,
mountedByNode: true, // Assume mounted, until proven otherwise
mountedByNodeSetCount: 0,
detachRequestedTime: time.Time{},
}
} else if !nodeObj.detachRequestedTime.IsZero() {
// Reset detachRequestedTime values if object exists and time is non-zero
nodeObj.detachRequestedTime = time.Time{}
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
// Create object if it doesn't exist
nodeToUpdate = nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if !nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
return volumeName, nil
}
func (asw *actualStateOfWorld) SetVolumeMountedByNode(
volumeName api.UniqueVolumeName, nodeName string, mounted bool) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) volumeName does not exist",
volumeName,
nodeName,
mounted)
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return fmt.Errorf(
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) nodeName does not exist",
volumeName,
nodeName,
mounted)
}
if mounted {
// Increment set count
nodeObj.mountedByNodeSetCount = nodeObj.mountedByNodeSetCount + 1
} else {
// Do not allow value to be reset unless it has been set at least once
if nodeObj.mountedByNodeSetCount == 0 {
return nil
}
}
nodeObj.mountedByNode = mounted
volumeObj.nodesAttachedTo[nodeName] = nodeObj
return nil
}
func (asw *actualStateOfWorld) MarkDesireToDetach(
volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) volumeName does not exist",
volumeName,
nodeName)
}
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) nodeName does not exist",
volumeName,
nodeName)
}
if nodeObj.detachRequestedTime.IsZero() {
nodeObj.detachRequestedTime = time.Now()
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}
// Remove volume from volumes to report as attached
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
delete(nodeToUpdate.volumesToReportAsAttached, volumeName)
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
}
return time.Since(volumeObj.nodesAttachedTo[nodeName].detachRequestedTime), nil
}
func (asw *actualStateOfWorld) ResetNodeStatusUpdateNeeded(
nodeName string) error {
asw.Lock()
defer asw.Unlock()
// Remove volume from volumes to report as attached
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
return fmt.Errorf(
"failed to ResetNodeStatusUpdateNeeded(nodeName=%q) nodeName does not exist",
nodeName)
}
nodeToUpdate.statusUpdateNeeded = false
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
return nil
}
func (asw *actualStateOfWorld) DeleteVolumeNode(
volumeName api.UniqueVolumeName, nodeName string) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return
}
_, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if nodeExists {
delete(asw.attachedVolumes[volumeName].nodesAttachedTo, nodeName)
}
if len(volumeObj.nodesAttachedTo) == 0 {
delete(asw.attachedVolumes, volumeName)
}
// Remove volume from volumes to report as attached
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
delete(nodeToUpdate.volumesToReportAsAttached, volumeName)
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
}
}
func (asw *actualStateOfWorld) VolumeNodeExists(
volumeName api.UniqueVolumeName, nodeName string) bool {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if volumeExists {
if _, nodeExists := volumeObj.nodesAttachedTo[nodeName]; nodeExists {
return true
}
}
return false
}
func (asw *actualStateOfWorld) GetAttachedVolumes() []AttachedVolume {
asw.RLock()
defer asw.RUnlock()
attachedVolumes := make([]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for _, nodeObj := range volumeObj.nodesAttachedTo {
attachedVolumes = append(
attachedVolumes,
getAttachedVolume(&volumeObj, &nodeObj))
}
}
return attachedVolumes
}
func (asw *actualStateOfWorld) GetAttachedVolumesForNode(
nodeName string) []AttachedVolume {
asw.RLock()
defer asw.RUnlock()
attachedVolumes := make(
[]AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */)
for _, volumeObj := range asw.attachedVolumes {
for actualNodeName, nodeObj := range volumeObj.nodesAttachedTo {
if actualNodeName == nodeName {
attachedVolumes = append(
attachedVolumes,
getAttachedVolume(&volumeObj, &nodeObj))
}
}
}
return attachedVolumes
}
func (asw *actualStateOfWorld) GetVolumesToReportAttached() map[string][]api.AttachedVolume {
asw.RLock()
defer asw.RUnlock()
volumesToReportAttached := make(map[string][]api.AttachedVolume)
for _, nodeToUpdateObj := range asw.nodesToUpdateStatusFor {
if nodeToUpdateObj.statusUpdateNeeded {
attachedVolumes := make(
[]api.AttachedVolume,
len(nodeToUpdateObj.volumesToReportAsAttached) /* len */)
i := 0
for _, volume := range nodeToUpdateObj.volumesToReportAsAttached {
attachedVolumes[i] = api.AttachedVolume{
Name: volume,
DevicePath: asw.attachedVolumes[volume].devicePath,
}
i++
}
volumesToReportAttached[nodeToUpdateObj.nodeName] = attachedVolumes
}
}
return volumesToReportAttached
}
func getAttachedVolume(
attachedVolume *attachedVolume,
nodeAttachedTo *nodeAttachedTo) AttachedVolume {
return AttachedVolume{
AttachedVolume: operationexecutor.AttachedVolume{
VolumeName: attachedVolume.volumeName,
VolumeSpec: attachedVolume.spec,
NodeName: nodeAttachedTo.nodeName,
PluginIsAttachable: true,
},
MountedByNode: nodeAttachedTo.mountedByNode,
DetachRequestedTime: nodeAttachedTo.detachRequestedTime}
}
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.4.0-alpha.2

搜索帮助