Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
// Package attachdetach implements a controller to manage volume attach and detach
// operations.
package attachdetach
import (
authenticationv1 "k8s.io/api/authentication/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
kcache "k8s.io/client-go/tools/cache"
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
// TimerConfig contains configuration of internal attach/detach timers and
// should be used only to speed up tests. DefaultTimerConfig is the suggested
// timer configuration for production.
type TimerConfig struct {
// ReconcilerLoopPeriod is the amount of time the reconciler loop waits
// between successive executions
ReconcilerLoopPeriod time.Duration
// ReconcilerMaxWaitForUnmountDuration is the maximum amount of time the
// attach detach controller will wait for a volume to be safely unmounted
// from its node. Once this time has expired, the controller will assume the
// node or kubelet are unresponsive and will detach the volume anyway.
ReconcilerMaxWaitForUnmountDuration time.Duration
// DesiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
DesiredStateOfWorldPopulatorLoopSleepPeriod time.Duration
// DesiredStateOfWorldPopulatorListPodsRetryDuration is the amount of
// time the DesiredStateOfWorldPopulator loop waits between list pods
// calls.
DesiredStateOfWorldPopulatorListPodsRetryDuration time.Duration
// DefaultTimerConfig is the default configuration of Attach/Detach controller
// timers.
var DefaultTimerConfig TimerConfig = TimerConfig{
ReconcilerLoopPeriod: 100 * time.Millisecond,
ReconcilerMaxWaitForUnmountDuration: 6 * time.Minute,
DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Minute,
DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Minute,
// AttachDetachController defines the operations supported by this controller.
type AttachDetachController interface {
Run(stopCh <-chan struct{})
GetDesiredStateOfWorld() cache.DesiredStateOfWorld
// NewAttachDetachController returns a new instance of AttachDetachController.
func NewAttachDetachController(
kubeClient clientset.Interface,
csiClient csiclient.Interface,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin,
prober volume.DynamicPluginProber,
disableReconciliationSync bool,
reconcilerSyncDuration time.Duration,
timerConfig TimerConfig) (AttachDetachController, error) {
// TODO: The default resyncPeriod for shared informers is 12 hours, this is
// unacceptable for the attach/detach controller. For example, if a pod is
// skipped because the node it is scheduled to didn't set its annotation in
// time, we don't want to have to wait 12hrs before processing the pod
// again.
// Luckily https://github.com/kubernetes/kubernetes/issues/23394 is being
// worked on and will split resync in to resync and relist. Once that
// happens the resync period can be set to something much faster (30
// seconds).
// If that issue is not resolved in time, then this controller will have to
// consider some unappealing alternate options: use a non-shared informer
// and set a faster resync period even if it causes relist, or requeue
// dropped pods so they are continuously processed until it is accepted or
// deleted (probably can't do this with sharedInformer), etc.
adc := &attachDetachController{
kubeClient: kubeClient,
csiClient: csiClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
podIndexer: podInformer.Informer().GetIndexer(),
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
blkutil := volumepathhandler.NewBlockVolumePathHandler()
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
adc.attacherDetacher =
false, // flag for experimental binary check for volume mount
adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater(
kubeClient, nodeInformer.Lister(), adc.actualStateOfWorld)
// Default these to values in options
adc.reconciler = reconciler.NewReconciler(
adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
AddFunc: adc.podAdd,
UpdateFunc: adc.podUpdate,
DeleteFunc: adc.podDelete,
// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
pvcKeyIndex: indexByPVCKey,
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
AddFunc: func(obj interface{}) {
UpdateFunc: func(old, new interface{}) {
return adc, nil
const (
pvcKeyIndex string = "pvcKey"
// indexByPVCKey returns PVC keys for given pod. Note that the index is only
// used for attaching, so we are only interested in active pods with nodeName
// set.
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
return keys, nil
type attachDetachController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
kubeClient clientset.Interface
// csiClient is the csi.storage.k8s.io API client used by volumehost to communicate with
// the API server.
csiClient csiclient.Interface
// pvcLister is the shared PVC lister used to fetch and store PVC
// objects from the API server. It is shared with other controllers and
// therefore the PVC objects in its store should be treated as immutable.
pvcLister corelisters.PersistentVolumeClaimLister
pvcsSynced kcache.InformerSynced
// pvLister is the shared PV lister used to fetch and store PV objects
// from the API server. It is shared with other controllers and therefore
// the PV objects in its store should be treated as immutable.
pvLister corelisters.PersistentVolumeLister
pvsSynced kcache.InformerSynced
podLister corelisters.PodLister
podsSynced kcache.InformerSynced
podIndexer kcache.Indexer
nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced
// cloud provider used by volume host
cloud cloudprovider.Interface
// volumePluginMgr used to initialize and fetch volume plugins
volumePluginMgr volume.VolumePluginMgr
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to this controller: i.e. what nodes the controller
// is managing, what volumes it wants be attached to these nodes, and which
// pods are scheduled to those nodes referencing the volumes.
// The data structure is populated by the controller using a stream of node
// and pod API server objects fetched by the informers.
desiredStateOfWorld cache.DesiredStateOfWorld
// actualStateOfWorld is a data structure containing the actual state of
// the world according to this controller: i.e. which volumes are attached
// to which nodes.
// The data structure is populated upon successful completion of attach and
// detach actions triggered by the controller and a periodic sync with
// storage providers for the "true" state of the world.
actualStateOfWorld cache.ActualStateOfWorld
// attacherDetacher is used to start asynchronous attach and operations
attacherDetacher operationexecutor.OperationExecutor
// reconciler is used to run an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering attach
// detach operations using the attacherDetacher.
reconciler reconciler.Reconciler
// nodeStatusUpdater is used to update node status with the list of attached
// volumes
nodeStatusUpdater statusupdater.NodeStatusUpdater
// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
// populate the current pods using podInformer.
desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
// recorder is used to record events in the API server
recorder record.EventRecorder
// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer adc.pvcQueue.ShutDown()
glog.Infof("Starting attach detach controller")
defer glog.Infof("Shutting down attach detach controller")
if !controller.WaitForCacheSync("attach detach", stopCh, adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced) {
err := adc.populateActualStateOfWorld()
if err != nil {
glog.Errorf("Error populating the actual state of world: %v", err)
err = adc.populateDesiredStateOfWorld()
if err != nil {
glog.Errorf("Error populating the desired state of world: %v", err)
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
go wait.Until(adc.pvcWorker, time.Second, stopCh)
func (adc *attachDetachController) populateActualStateOfWorld() error {
glog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.nodeLister.List(labels.Everything())
if err != nil {
return err
for _, node := range nodes {
nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
// In such a case it should be detached in the first reconciliation cycle and the
// volume spec is not needed to detach a volume. If the volume is used by a pod, it
// its spec can be: this would happen during in the populateDesiredStateOfWorld which
// scans the pods and updates their volumes in the ActualStateOfWorld too.
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
if err != nil {
glog.Errorf("Failed to mark the volume as attached: %v", err)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.addNodeToDswp(node, types.NodeName(node.Name))
return nil
func (adc *attachDetachController) getNodeVolumeDevicePath(
volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
var devicePath string
var found bool
node, err := adc.nodeLister.Get(string(nodeName))
if err != nil {
return devicePath, err
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeName == attachedVolume.Name {
devicePath = attachedVolume.DevicePath
found = true
if !found {
err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
return devicePath, err
func (adc *attachDetachController) populateDesiredStateOfWorld() error {
glog.V(5).Infof("Populating DesiredStateOfworld")
pods, err := adc.podLister.List(labels.Everything())
if err != nil {
return err
for _, pod := range pods {
podToAdd := pod
for _, podVolume := range podToAdd.Spec.Volumes {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
// pod will be detached and the spec is irrelevant.
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
if err != nil {
"Error creating spec for volume %q, pod %q/%q: %v",
nodeName := types.NodeName(podToAdd.Spec.NodeName)
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || plugin == nil {
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
"Failed to find unique name for volume %q, pod %q/%q: %v",
if adc.actualStateOfWorld.VolumeNodeExists(volumeName, nodeName) {
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
glog.Errorf("Failed to find device path: %v", err)
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
if err != nil {
glog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
return nil
func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
if pod.Spec.NodeName == "" {
// Ignore pods without NodeName, indicating they are not scheduled.
volumeActionFlag := util.DetermineVolumeAction(
true /* default volume action */)
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
// GetDesiredStateOfWorld returns desired state of world associated with controller
func (adc *attachDetachController) GetDesiredStateOfWorld() cache.DesiredStateOfWorld {
return adc.desiredStateOfWorld
func (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {
pod, ok := newObj.(*v1.Pod)
if pod == nil || !ok {
if pod.Spec.NodeName == "" {
// Ignore pods without NodeName, indicating they are not scheduled.
volumeActionFlag := util.DetermineVolumeAction(
true /* default volume action */)
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
func (adc *attachDetachController) podDelete(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
util.ProcessPodVolumes(pod, false, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
func (adc *attachDetachController) nodeAdd(obj interface{}) {
node, ok := obj.(*v1.Node)
// TODO: investigate if nodeName is empty then if we can return
// kubernetes/kubernetes/issues/37777
if node == nil || !ok {
nodeName := types.NodeName(node.Name)
adc.nodeUpdate(nil, obj)
// kubernetes/kubernetes/issues/37586
// This is to workaround the case when a node add causes to wipe out
// the attached volumes field. This function ensures that we sync with
// the actual status.
func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
node, ok := newObj.(*v1.Node)
// TODO: investigate if nodeName is empty then if we can return
if node == nil || !ok {
nodeName := types.NodeName(node.Name)
adc.addNodeToDswp(node, nodeName)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
func (adc *attachDetachController) nodeDelete(obj interface{}) {
node, ok := obj.(*v1.Node)
if node == nil || !ok {
nodeName := types.NodeName(node.Name)
if err := adc.desiredStateOfWorld.DeleteNode(nodeName); err != nil {
// This might happen during drain, but we still want it to appear in our logs
glog.Infof("error removing node %q from desired-state-of-world: %v", nodeName, err)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
func (adc *attachDetachController) enqueuePVC(obj interface{}) {
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
// pvcWorker processes items from pvcQueue
func (adc *attachDetachController) pvcWorker() {
for adc.processNextItem() {
func (adc *attachDetachController) processNextItem() bool {
keyObj, shutdown := adc.pvcQueue.Get()
if shutdown {
return false
defer adc.pvcQueue.Done(keyObj)
if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
return true
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
return true
func (adc *attachDetachController) syncPVCByKey(key string) error {
glog.V(5).Infof("syncPVCByKey[%s]", key)
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
return nil
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if apierrors.IsNotFound(err) {
glog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
return nil
if err != nil {
return err
if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
// Skip unbound PVCs.
return nil
objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
if err != nil {
return err
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
volumeActionFlag := util.DetermineVolumeAction(
true /* default volume action */)
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
return nil
// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
for _, volumeInUse := range volumesInUse {
if attachedVolume.VolumeName == volumeInUse {
mounted = true
err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)
if err != nil {
"SetVolumeMountedByNode(%q, %q, %v) returned an error: %v",
attachedVolume.VolumeName, nodeName, mounted, err)
// VolumeHost implementation
// This is an unfortunate requirement of the current factoring of volume plugin
// initializing code. It requires kubelet specific methods used by the mounting
// code to be implemented by all initializers even if the initializer does not
// do mounting (like this attach/detach controller).
// Issue kubernetes/kubernetes/issues/14217 to fix this.
func (adc *attachDetachController) GetPluginDir(podUID string) string {
return ""
func (adc *attachDetachController) GetVolumeDevicePluginDir(podUID string) string {
return ""
func (adc *attachDetachController) GetPodsDir() string {
return ""
func (adc *attachDetachController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
func (adc *attachDetachController) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
func (adc *attachDetachController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return ""
func (adc *attachDetachController) GetKubeClient() clientset.Interface {
return adc.kubeClient
func (adc *attachDetachController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
return nil, fmt.Errorf("NewWrapperMounter not supported by Attach/Detach controller's VolumeHost implementation")
func (adc *attachDetachController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
return nil, fmt.Errorf("NewWrapperUnmounter not supported by Attach/Detach controller's VolumeHost implementation")
func (adc *attachDetachController) GetCloudProvider() cloudprovider.Interface {
return adc.cloud
func (adc *attachDetachController) GetMounter(pluginName string) mount.Interface {
return nil
func (adc *attachDetachController) GetHostName() string {
return ""
func (adc *attachDetachController) GetHostIP() (net.IP, error) {
return nil, fmt.Errorf("GetHostIP() not supported by Attach/Detach controller's VolumeHost implementation")
func (adc *attachDetachController) GetNodeAllocatable() (v1.ResourceList, error) {
return v1.ResourceList{}, nil
func (adc *attachDetachController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return func(_, _ string) (*v1.Secret, error) {
return nil, fmt.Errorf("GetSecret unsupported in attachDetachController")
func (adc *attachDetachController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
return func(_, _ string) (*v1.ConfigMap, error) {
return nil, fmt.Errorf("GetConfigMap unsupported in attachDetachController")
func (adc *attachDetachController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
return nil, fmt.Errorf("GetServiceAccountToken unsupported in attachDetachController")
func (adc *attachDetachController) GetExec(pluginName string) mount.Exec {
return mount.NewOsExec()
func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {
if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {
keepTerminatedPodVolumes := false
if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok {
keepTerminatedPodVolumes = (t == "true")
// Node specifies annotation indicating it should be managed by attach
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller")
func (adc *attachDetachController) GetNodeName() types.NodeName {
return ""
func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
return adc.recorder
func (adc *attachDetachController) GetCSIClient() csiclient.Interface {
return adc.csiClient
