1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
azure_vmss.go 34.25 KB
一键复制 编辑 原始数据 按行查看 历史
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package azure
import (
cloudprovider "k8s.io/cloud-provider"
var (
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
ErrorNotVmssInstance = errors.New("not a vmss instance")
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
resourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines(?:.*)`)
vmssNicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines/(?:.*)/networkInterfaces/(?:.*)`)
vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s"
// scaleSet implements VMSet interface for Azure scale set.
type scaleSet struct {
// availabilitySet is also required for scaleSet because some instances
// (e.g. master nodes) may not belong to any scale sets.
availabilitySet VMSet
vmssCache *timedCache
vmssVMCache *timedCache
nodeNameToScaleSetMappingCache *timedCache
availabilitySetNodesCache *timedCache
// newScaleSet creates a new scaleSet.
func newScaleSet(az *Cloud) (VMSet, error) {
var err error
ss := &scaleSet{
Cloud: az,
availabilitySet: newAvailabilitySet(az),
ss.nodeNameToScaleSetMappingCache, err = ss.newNodeNameToScaleSetMappingCache()
if err != nil {
return nil, err
ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache()
if err != nil {
return nil, err
ss.vmssCache, err = ss.newVmssCache()
if err != nil {
return nil, err
ss.vmssVMCache, err = ss.newVmssVMCache()
if err != nil {
return nil, err
return ss, nil
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) {
instanceID, err = getScaleSetVMInstanceID(nodeName)
if err != nil {
return ssName, instanceID, vm, err
ssName, err = ss.getScaleSetNameByNodeName(nodeName)
if err != nil {
return ssName, instanceID, vm, err
if ssName == "" {
return "", "", vm, cloudprovider.InstanceNotFound
resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
if err != nil {
return "", "", vm, err
klog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID))
cachedVM, err := ss.vmssVMCache.Get(key)
if err != nil {
return ssName, instanceID, vm, err
if cachedVM == nil {
klog.Errorf("Can't find node (%q) in any scale sets", nodeName)
return ssName, instanceID, vm, cloudprovider.InstanceNotFound
return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
// GetPowerStatusByNodeName returns the power state of the specified node.
func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return powerState, err
if vm.InstanceView != nil && vm.InstanceView.Statuses != nil {
statuses := *vm.InstanceView.Statuses
for _, status := range statuses {
state := to.String(status.Code)
if strings.HasPrefix(state, vmPowerStatePrefix) {
return strings.TrimPrefix(state, vmPowerStatePrefix), nil
return "", fmt.Errorf("failed to get power status for node %q", name)
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
key := buildVmssCacheKey(resourceGroup, vmName)
cachedVM, err := ss.vmssVMCache.Get(key)
if err != nil {
return vm, err
if cachedVM == nil {
klog.Errorf("couldn't find vmss virtual machine by scaleSetName (%s) and instanceID (%s)", scaleSetName, instanceID)
return vm, cloudprovider.InstanceNotFound
return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
// not exist or is no longer running.
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetInstanceIDByNodeName(name)
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return "", err
resourceID := *vm.ID
convertedResourceID, err := convertResourceGroupNameToLower(resourceID)
if err != nil {
klog.Errorf("convertResourceGroupNameToLower failed with error: %v", err)
return "", err
return convertedResourceID, nil
// GetNodeNameByProviderID gets the node name by provider ID.
func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
// NodeName is not part of providerID for vmss instances.
scaleSetName, err := extractScaleSetNameByProviderID(providerID)
if err != nil {
klog.V(4).Infof("Can not extract scale set name from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
resourceGroup, err := extractResourceGroupByProviderID(providerID)
if err != nil {
return "", fmt.Errorf("error of extracting resource group for node %q", providerID)
instanceID, err := getLastSegment(providerID)
if err != nil {
klog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID)
if err != nil {
return "", err
if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil {
nodeName := strings.ToLower(*vm.OsProfile.ComputerName)
return types.NodeName(nodeName), nil
return "", nil
// GetInstanceTypeByNodeName gets the instance type by node name.
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetInstanceTypeByNodeName(name)
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return "", err
if vm.Sku != nil && vm.Sku.Name != nil {
return *vm.Sku.Name, nil
return "", nil
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running
// with availability zone, then it returns fault domain.
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return cloudprovider.Zone{}, err
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetZoneByNodeName(name)
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return cloudprovider.Zone{}, err
var failureDomain string
if vm.Zones != nil && len(*vm.Zones) > 0 {
// Get availability zone for the node.
zones := *vm.Zones
zoneID, err := strconv.Atoi(zones[0])
if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("failed to parse zone %q: %v", zones, err)
failureDomain = ss.makeZone(zoneID)
} else if vm.InstanceView != nil && vm.InstanceView.PlatformFaultDomain != nil {
// Availability zone is not used for the node, falling back to fault domain.
failureDomain = strconv.Itoa(int(*vm.InstanceView.PlatformFaultDomain))
return cloudprovider.Zone{
FailureDomain: failureDomain,
Region: *vm.Location,
}, nil
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType.
func (ss *scaleSet) GetPrimaryVMSetName() string {
return ss.Config.PrimaryScaleSetName
// GetIPByNodeName gets machine private IP and public IP by node name.
func (ss *scaleSet) GetIPByNodeName(nodeName string) (string, string, error) {
nic, err := ss.GetPrimaryInterface(nodeName)
if err != nil {
klog.Errorf("error: ss.GetIPByNodeName(%s), GetPrimaryInterface(%q), err=%v", nodeName, nodeName, err)
return "", "", err
ipConfig, err := getPrimaryIPConfig(nic)
if err != nil {
klog.Errorf("error: ss.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
return "", "", err
internalIP := *ipConfig.PrivateIPAddress
publicIP := ""
if ipConfig.PublicIPAddress != nil && ipConfig.PublicIPAddress.ID != nil {
pipID := *ipConfig.PublicIPAddress.ID
pipName, err := getLastSegment(pipID)
if err != nil {
return "", "", fmt.Errorf("failed to get publicIP name for node %q with pipID %q", nodeName, pipID)
resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
if err != nil {
return "", "", err
pip, existsPip, err := ss.getPublicIPAddress(resourceGroup, pipName)
if err != nil {
return "", "", err
if existsPip {
publicIP = *pip.IPAddress
return internalIP, publicIP, nil
// This returns the full identifier of the primary NIC for the given VM.
func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSetVM) (string, error) {
if len(*machine.NetworkProfile.NetworkInterfaces) == 1 {
return *(*machine.NetworkProfile.NetworkInterfaces)[0].ID, nil
for _, ref := range *machine.NetworkProfile.NetworkInterfaces {
if *ref.Primary {
return *ref.ID, nil
return "", fmt.Errorf("failed to find a primary nic for the vm. vmname=%q", *machine.Name)
// machineName is composed of computerNamePrefix and 36-based instanceID.
// And instanceID part if in fixed length of 6 characters.
// Refer https://msftstack.wordpress.com/2017/05/10/figuring-out-azure-vm-scale-set-machine-names/.
func getScaleSetVMInstanceID(machineName string) (string, error) {
nameLength := len(machineName)
if nameLength < 6 {
return "", ErrorNotVmssInstance
instanceID, err := strconv.ParseUint(machineName[nameLength-6:], 36, 64)
if err != nil {
return "", ErrorNotVmssInstance
return fmt.Sprintf("%d", instanceID), nil
// extractScaleSetNameByProviderID extracts the scaleset name by vmss node's ProviderID.
func extractScaleSetNameByProviderID(providerID string) (string, error) {
matches := scaleSetNameRE.FindStringSubmatch(providerID)
if len(matches) != 2 {
return "", ErrorNotVmssInstance
return matches[1], nil
// extractResourceGroupByProviderID extracts the resource group name by vmss node's ProviderID.
func extractResourceGroupByProviderID(providerID string) (string, error) {
matches := resourceGroupRE.FindStringSubmatch(providerID)
if len(matches) != 2 {
return "", ErrorNotVmssInstance
return matches[1], nil
// listScaleSets lists all scale sets.
func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
var err error
ctx, cancel := getContextWithCancel()
defer cancel()
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, resourceGroup)
if err != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
return nil, err
ssNames := make([]string, len(allScaleSets))
for i := range allScaleSets {
ssNames[i] = *(allScaleSets[i].Name)
return ssNames, nil
// listScaleSetVMs lists VMs belonging to the specified scale set.
func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) {
var err error
ctx, cancel := getContextWithCancel()
defer cancel()
allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView))
if err != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err)
return nil, err
return allVMs, nil
// getAgentPoolScaleSets lists the virtual machines for the resource group and then builds
// a list of scale sets that match the nodes available to k8s.
func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
agentPoolScaleSets := &[]string{}
for nx := range nodes {
if isMasterNode(nodes[nx]) {
if ss.ShouldNodeExcludedFromLoadBalancer(nodes[nx]) {
nodeName := nodes[nx].Name
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
if err != nil {
return nil, err
if ssName == "" {
klog.V(3).Infof("Node %q is not belonging to any known scale sets", nodeName)
*agentPoolScaleSets = append(*agentPoolScaleSets, ssName)
return agentPoolScaleSets, nil
// GetVMSetNames selects all possible availability sets or scale sets
// (depending vmType configured) for service load balancer. If the service has
// no loadbalancer mode annotation returns the primary VMSet. If service annotation
// for loadbalancer exists then return the eligible VMSet.
func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetNames *[]string, err error) {
hasMode, isAuto, serviceVMSetNames := getServiceLoadBalancerMode(service)
if !hasMode {
// no mode specified in service annotation default to PrimaryScaleSetName.
scaleSetNames := &[]string{ss.Config.PrimaryScaleSetName}
return scaleSetNames, nil
scaleSetNames, err := ss.getAgentPoolScaleSets(nodes)
if err != nil {
klog.Errorf("ss.GetVMSetNames - getAgentPoolScaleSets failed err=(%v)", err)
return nil, err
if len(*scaleSetNames) == 0 {
klog.Errorf("ss.GetVMSetNames - No scale sets found for nodes in the cluster, node count(%d)", len(nodes))
return nil, fmt.Errorf("No scale sets found for nodes, node count(%d)", len(nodes))
// sort the list to have deterministic selection
if !isAuto {
if serviceVMSetNames == nil || len(serviceVMSetNames) == 0 {
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
// validate scale set exists
var found bool
for sasx := range serviceVMSetNames {
for asx := range *scaleSetNames {
if strings.EqualFold((*scaleSetNames)[asx], serviceVMSetNames[sasx]) {
found = true
serviceVMSetNames[sasx] = (*scaleSetNames)[asx]
if !found {
klog.Errorf("ss.GetVMSetNames - scale set (%s) in service annotation not found", serviceVMSetNames[sasx])
return nil, fmt.Errorf("scale set (%s) - not found", serviceVMSetNames[sasx])
vmSetNames = &serviceVMSetNames
return vmSetNames, nil
// extractResourceGroupByVMSSNicID extracts the resource group name by vmss nicID.
func extractResourceGroupByVMSSNicID(nicID string) (string, error) {
matches := vmssNicResourceGroupRE.FindStringSubmatch(nicID)
if len(matches) != 2 {
return "", fmt.Errorf("error of extracting resourceGroup from nicID %q", nicID)
return matches[1], nil
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return network.Interface{}, err
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetPrimaryInterface(nodeName)
ssName, instanceID, vm, err := ss.getVmssVM(nodeName)
if err != nil {
// VM is availability set, but not cached yet in availabilitySetNodesCache.
if err == ErrorNotVmssInstance {
return ss.availabilitySet.GetPrimaryInterface(nodeName)
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getVmssVM(%s), err=%v", nodeName, nodeName, err)
return network.Interface{}, err
primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getPrimaryInterfaceID(), err=%v", nodeName, err)
return network.Interface{}, err
nicName, err := getLastSegment(primaryInterfaceID)
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, primaryInterfaceID, err)
return network.Interface{}, err
resourceGroup, err := extractResourceGroupByVMSSNicID(primaryInterfaceID)
if err != nil {
return network.Interface{}, err
ctx, cancel := getContextWithCancel()
defer cancel()
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, resourceGroup, ssName, instanceID, nicName, "")
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, resourceGroup, ssName, nicName, err)
return network.Interface{}, err
// Fix interface's location, which is required when updating the interface.
// TODO: is this a bug of azure SDK?
if nic.Location == nil || *nic.Location == "" {
nic.Location = vm.Location
return nic, nil
// getScaleSet gets scale set with exponential backoff retry
func (ss *scaleSet) getScaleSet(service *v1.Service, name string) (compute.VirtualMachineScaleSet, bool, error) {
if ss.Config.shouldOmitCloudProviderBackoff() {
var result compute.VirtualMachineScaleSet
var exists bool
cached, err := ss.vmssCache.Get(name)
if err != nil {
ss.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", err.Error())
klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", name, err)
return result, false, nil
if cached != nil {
exists = true
result = *(cached.(*compute.VirtualMachineScaleSet))
return result, exists, err
return ss.getScaleSetWithRetry(service, name)
// getScaleSetWithRetry gets scale set with exponential backoff retry
func (ss *scaleSet) getScaleSetWithRetry(service *v1.Service, name string) (compute.VirtualMachineScaleSet, bool, error) {
var result compute.VirtualMachineScaleSet
var exists bool
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
cached, retryErr := ss.vmssCache.Get(name)
if retryErr != nil {
ss.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error())
klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", name, retryErr)
return false, nil
klog.V(4).Infof("backoff: success for scale set %q", name)
if cached != nil {
exists = true
result = *(cached.(*compute.VirtualMachineScaleSet))
return true, nil
return result, exists, err
// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets.
func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
networkConfigurations := *networkConfigurationList
if len(networkConfigurations) == 1 {
return &networkConfigurations[0], nil
for idx := range networkConfigurations {
networkConfig := &networkConfigurations[idx]
if networkConfig.Primary != nil && *networkConfig.Primary == true {
return networkConfig, nil
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName)
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
ipConfigurations := *config.IPConfigurations
if len(ipConfigurations) == 1 {
return &ipConfigurations[0], nil
for idx := range ipConfigurations {
ipConfig := &ipConfigurations[idx]
if ipConfig.Primary != nil && *ipConfig.Primary == true {
return ipConfig, nil
return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set %q", scaleSetName)
// createOrUpdateVMSS invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSS(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
if ss.Config.shouldOmitCloudProviderBackoff() {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return ss.processHTTPResponse(service, "CreateOrUpdateVMSS", resp, err)
return ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSSWithRetry(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSS", resp, err)
// updateVMSSInstances invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
func (ss *scaleSet) updateVMSSInstances(service *v1.Service, scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
if ss.Config.shouldOmitCloudProviderBackoff() {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, scaleSetName, vmInstanceIDs)
klog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName)
return ss.processHTTPResponse(service, "CreateOrUpdateVMSSInstance", resp, err)
return ss.updateVMSSInstancesWithRetry(service, scaleSetName, vmInstanceIDs)
// updateVMSSInstancesWithRetry invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
func (ss *scaleSet) updateVMSSInstancesWithRetry(service *v1.Service, scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, scaleSetName, vmInstanceIDs)
klog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSSInstance", resp, err)
// getNodesScaleSets returns scalesets with instanceIDs and standard node names for given nodes.
func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String, []*v1.Node, error) {
scalesets := make(map[string]sets.String)
standardNodes := []*v1.Node{}
for _, curNode := range nodes {
if ss.useStandardLoadBalancer() && ss.excludeMasterNodesFromStandardLB() && isMasterNode(curNode) {
klog.V(4).Infof("Excluding master node %q from load balancer backendpool", curNode.Name)
if ss.ShouldNodeExcludedFromLoadBalancer(curNode) {
klog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", curNode.Name)
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
if err != nil {
klog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name)
standardNodes = append(standardNodes, curNode)
if _, ok := scalesets[curScaleSetName]; !ok {
scalesets[curScaleSetName] = sets.NewString()
instanceID, err := getLastSegment(curNode.Spec.ProviderID)
if err != nil {
klog.Errorf("Failed to get instance ID for node %q: %v", curNode.Spec.ProviderID, err)
return nil, nil, err
return scalesets, standardNodes, nil
// ensureHostsInVMSetPool ensures the given Node's primary IP configurations are
// participating in the vmSet's LoadBalancer Backend Pool.
func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error {
klog.V(3).Infof("ensuring hosts %q of scaleset %q in LB backendpool %q", instanceIDs, vmSetName, backendPoolID)
serviceName := getServiceName(service)
virtualMachineScaleSet, exists, err := ss.getScaleSet(service, vmSetName)
if err != nil {
klog.Errorf("ss.getScaleSet(%s) for service %q failed: %v", vmSetName, serviceName, err)
return err
if !exists {
errorMessage := fmt.Errorf("Scale set %q not found", vmSetName)
klog.Errorf("%v", errorMessage)
return errorMessage
// Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName)
if err != nil {
return err
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName)
if err != nil {
return err
// Update primary IP configuration's LoadBalancerBackendAddressPools.
foundPool := false
newBackendPools := []compute.SubResource{}
if primaryIPConfiguration.LoadBalancerBackendAddressPools != nil {
newBackendPools = *primaryIPConfiguration.LoadBalancerBackendAddressPools
for _, existingPool := range newBackendPools {
if strings.EqualFold(backendPoolID, *existingPool.ID) {
foundPool = true
if !foundPool {
if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 {
// Although standard load balancer supports backends from multiple vmss,
// the same network interface couldn't be added to more than one load balancer of
// the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain
// about this.
for _, pool := range newBackendPools {
backendPool := *pool.ID
matches := backendPoolIDRE.FindStringSubmatch(backendPool)
if len(matches) == 2 {
lbName := matches[1]
if strings.HasSuffix(lbName, InternalLoadBalancerNameSuffix) == isInternal {
klog.V(4).Infof("vmss %q has already been added to LB %q, omit adding it to a new one", vmSetName, lbName)
return nil
newBackendPools = append(newBackendPools,
ID: to.StringPtr(backendPoolID),
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
err := ss.createOrUpdateVMSS(service, virtualMachineScaleSet)
if err != nil {
return err
// Update instances to latest VMSS model.
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
err = ss.updateVMSSInstances(service, vmSetName, vmInstanceIDs)
if err != nil {
return err
return nil
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
serviceName := getServiceName(service)
scalesets, standardNodes, err := ss.getNodesScaleSets(nodes)
if err != nil {
klog.Errorf("getNodesScaleSets() for service %q failed: %v", serviceName, err)
return err
for ssName, instanceIDs := range scalesets {
// Only add nodes belonging to specified vmSet for basic SKU LB.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) {
if instanceIDs.Len() == 0 {
// This may happen when scaling a vmss capacity to 0.
klog.V(3).Infof("scale set %q has 0 nodes, adding it to load balancer anyway", ssName)
// InstanceIDs is required to update vmss, use * instead here since there are no nodes actually.
err := ss.ensureHostsInVMSetPool(service, backendPoolID, ssName, instanceIDs.List(), isInternal)
if err != nil {
klog.Errorf("ensureHostsInVMSetPool() with scaleSet %q for service %q failed: %v", ssName, serviceName, err)
return err
if ss.useStandardLoadBalancer() && len(standardNodes) > 0 {
err := ss.availabilitySet.EnsureHostsInPool(service, standardNodes, backendPoolID, "", isInternal)
if err != nil {
klog.Errorf("availabilitySet.EnsureHostsInPool() for service %q failed: %v", serviceName, err)
return err
return nil
// ensureScaleSetBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified scaleset.
func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(service *v1.Service, poolID, ssName string) error {
klog.V(3).Infof("ensuring backend pool %q deleted from scaleset %q", poolID, ssName)
virtualMachineScaleSet, exists, err := ss.getScaleSet(service, ssName)
if err != nil {
klog.Errorf("ss.ensureScaleSetBackendPoolDeleted(%s, %s) getScaleSet(%s) failed: %v", poolID, ssName, ssName, err)
return err
if !exists {
klog.V(2).Infof("ss.ensureScaleSetBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, ssName, ssName)
return nil
// Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, ssName)
if err != nil {
return err
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, ssName)
if err != nil {
return err
// Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration.
if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 {
return nil
existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools
newBackendPools := []compute.SubResource{}
foundPool := false
for i := len(existingBackendPools) - 1; i >= 0; i-- {
curPool := existingBackendPools[i]
if strings.EqualFold(poolID, *curPool.ID) {
klog.V(10).Infof("ensureScaleSetBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, ssName)
foundPool = true
newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...)
if !foundPool {
// Pool not found, assume it has been already removed.
return nil
// Update scale set with backoff.
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
klog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", ssName)
err = ss.createOrUpdateVMSS(service, virtualMachineScaleSet)
if err != nil {
return err
// Update instances to latest VMSS model.
instanceIDs := []string{"*"}
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
err = ss.updateVMSSInstances(service, ssName, vmInstanceIDs)
if err != nil {
return err
// Update virtualMachineScaleSet again. This is a workaround for removing VMSS reference from LB.
// TODO: remove this workaround when figuring out the root cause.
if len(newBackendPools) == 0 {
err = ss.createOrUpdateVMSS(service, virtualMachineScaleSet)
if err != nil {
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
return nil
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
if backendAddressPools == nil {
return nil
scalesets := sets.NewString()
for _, backendPool := range *backendAddressPools {
if strings.EqualFold(*backendPool.ID, poolID) && backendPool.BackendIPConfigurations != nil {
for _, ipConfigurations := range *backendPool.BackendIPConfigurations {
if ipConfigurations.ID == nil {
ssName, err := extractScaleSetNameByProviderID(*ipConfigurations.ID)
if err != nil {
klog.V(4).Infof("backend IP configuration %q is not belonging to any vmss, omit it", *ipConfigurations.ID)
for ssName := range scalesets {
// Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) {
err := ss.ensureScaleSetBackendPoolDeleted(service, poolID, ssName)
if err != nil {
klog.Errorf("ensureScaleSetBackendPoolDeleted() with scaleSet %q failed: %v", ssName, err)
return err
return nil
// getVmssMachineID returns the full identifier of a vmss virtual machine.
func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string {
return fmt.Sprintf(
马建仓 AI 助手


344bd9b3 5694891 D2dac590 5694891