1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
openstack_loadbalancer.go 30.26 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openstack
import (
"time"
"github.com/rackspace/gophercloud"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
"github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
v2_monitors "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
v2_pools "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
neutron_ports "github.com/rackspace/gophercloud/openstack/networking/v2/ports"
"github.com/rackspace/gophercloud/pagination"
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/cloudprovider"
)
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
const loadbalancerActiveTimeoutSeconds = 120
const loadbalancerDeleteTimeoutSeconds = 30
// LoadBalancer implementation for LBaaS v1
type LbaasV1 struct {
LoadBalancer
}
// LoadBalancer implementation for LBaaS v2
type LbaasV2 struct {
LoadBalancer
}
type empty struct{}
func networkExtensions(client *gophercloud.ServiceClient) (map[string]bool, error) {
seen := make(map[string]bool)
pager := extensions.List(client)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
exts, err := extensions.ExtractExtensions(page)
if err != nil {
return false, err
}
for _, ext := range exts {
seen[ext.Alias] = true
}
return true, nil
})
return seen, err
}
func getPortIDByIP(client *gophercloud.ServiceClient, ipAddress string) (string, error) {
var portID string
err := neutron_ports.List(client, neutron_ports.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
portList, err := neutron_ports.ExtractPorts(page)
if err != nil {
return false, err
}
for _, port := range portList {
for _, ip := range port.FixedIPs {
if ip.IPAddress == ipAddress {
portID = port.ID
return false, nil
}
}
}
return true, nil
})
return portID, err
}
func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
opts := floatingips.ListOpts{
PortID: portID,
}
pager := floatingips.List(client, opts)
floatingIPList := make([]floatingips.FloatingIP, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
f, err := floatingips.ExtractFloatingIPs(page)
if err != nil {
return false, err
}
floatingIPList = append(floatingIPList, f...)
if len(floatingIPList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(floatingIPList) == 0 {
return nil, ErrNotFound
} else if len(floatingIPList) > 1 {
return nil, ErrMultipleResults
}
return &floatingIPList[0], nil
}
func getPoolByName(client *gophercloud.ServiceClient, name string) (*pools.Pool, error) {
opts := pools.ListOpts{
Name: name,
}
pager := pools.List(client, opts)
poolList := make([]pools.Pool, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
p, err := pools.ExtractPools(page)
if err != nil {
return false, err
}
poolList = append(poolList, p...)
if len(poolList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(poolList) == 0 {
return nil, ErrNotFound
} else if len(poolList) > 1 {
return nil, ErrMultipleResults
}
return &poolList[0], nil
}
func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) {
opts := vips.ListOpts{
Name: name,
}
pager := vips.List(client, opts)
vipList := make([]vips.VirtualIP, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
v, err := vips.ExtractVIPs(page)
if err != nil {
return false, err
}
vipList = append(vipList, v...)
if len(vipList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(vipList) == 0 {
return nil, ErrNotFound
} else if len(vipList) > 1 {
return nil, ErrMultipleResults
}
return &vipList[0], nil
}
func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
opts := loadbalancers.ListOpts{
Name: name,
}
pager := loadbalancers.List(client, opts)
loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
v, err := loadbalancers.ExtractLoadbalancers(page)
if err != nil {
return false, err
}
loadbalancerList = append(loadbalancerList, v...)
if len(loadbalancerList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(loadbalancerList) == 0 {
return nil, ErrNotFound
} else if len(loadbalancerList) > 1 {
return nil, ErrMultipleResults
}
return &loadbalancerList[0], nil
}
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
start := time.Now().Second()
for {
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
return "", err
}
if loadbalancer.ProvisioningStatus == "ACTIVE" {
return "ACTIVE", nil
} else if loadbalancer.ProvisioningStatus == "ERROR" {
return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state")
}
time.Sleep(1 * time.Second)
if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds {
return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
}
}
}
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
start := time.Now().Second()
for {
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
if err == ErrNotFound {
return nil
} else {
return err
}
}
time.Sleep(1 * time.Second)
if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds {
return fmt.Errorf("Loadbalancer failed to delete within the alloted time")
}
}
}
func (lbaas *LbaasV2) GetLoadBalancer(clusterName string, service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
if err == ErrNotFound {
return nil, false, nil
}
if loadbalancer == nil {
return nil, false, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
return status, true, err
}
// TODO: This code currently ignores 'region' and always creates a
// loadbalancer in only the current OpenStack region. We should take
// a list of regions (from config) and query/create loadbalancers in
// each region.
func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
ports := apiService.Spec.Ports
if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
// Check for TCP protocol on each port
// TODO: Convert all error messages to use an event recorder
for _, port := range ports {
if port.Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
}
affinity := api.ServiceAffinityNone //apiService.Spec.SessionAffinity
var persistence *v2_pools.SessionPersistence
switch affinity {
case api.ServiceAffinityNone:
persistence = nil
case api.ServiceAffinityClientIP:
persistence = &v2_pools.SessionPersistence{Type: "SOURCE_IP"}
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
}
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
if err != nil {
return nil, err
}
if !service.IsAllowAll(sourceRanges) {
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
}
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
_, exists, err := lbaas.GetLoadBalancer(clusterName, apiService)
if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
}
}
lbmethod := v2_pools.LBMethod(lbaas.opts.LBMethod)
if lbmethod == "" {
lbmethod = v2_pools.LBMethodRoundRobin
}
name := cloudprovider.GetLoadBalancerName(apiService)
createOpts := loadbalancers.CreateOpts{
Name: name,
Description: fmt.Sprintf("Kubernetes external service %s", name),
VipSubnetID: lbaas.opts.SubnetId,
}
loadBalancerIP := apiService.Spec.LoadBalancerIP
if loadBalancerIP != "" {
createOpts.VipAddress = loadBalancerIP
}
loadbalancer, err := loadbalancers.Create(lbaas.network, createOpts).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
for portIndex, port := range ports {
listener, err := listeners.Create(lbaas.network, listeners.CreateOpts{
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
pool, err := v2_pools.Create(lbaas.network, v2_pools.CreateOpts{
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
Protocol: v2_pools.Protocol(port.Protocol),
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
_, err = v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
if lbaas.opts.CreateMonitor {
_, err = v2_monitors.Create(lbaas.network, v2_monitors.CreateOpts{
PoolID: pool.ID,
Type: string(port.Protocol),
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
if lbaas.opts.FloatingNetworkId != "" {
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
floatIPOpts := floatingips.CreateOpts{
FloatingNetworkID: lbaas.opts.FloatingNetworkId,
PortID: portID,
}
floatIP, err := floatingips.Create(lbaas.network, floatIPOpts).Extract()
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return nil, err
}
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
}
return status, nil
}
func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *api.Service, hosts []string) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, hosts)
ports := service.Spec.Ports
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
if err != nil {
return err
}
if loadbalancer == nil {
return fmt.Errorf("Loadbalancer %s does not exist", loadBalancerName)
}
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol string
Port int
}
lbListeners := make(map[portKey]listeners.Listener)
err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
listenersList, err := listeners.ExtractListeners(page)
if err != nil {
return false, err
}
for _, l := range listenersList {
key := portKey{Protocol: l.Protocol, Port: l.ProtocolPort}
lbListeners[key] = l
}
return true, nil
})
if err != nil {
return err
}
// Get all pools for this loadbalancer, by listener ID.
lbPools := make(map[string]v2_pools.Pool)
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
poolsList, err := v2_pools.ExtractPools(page)
if err != nil {
return false, err
}
for _, p := range poolsList {
for _, l := range p.Listeners {
lbPools[l.ID] = p
}
}
return true, nil
})
if err != nil {
return err
}
// Compose Set of member (addresses) that _should_ exist
addrs := map[string]empty{}
for _, host := range hosts {
addr, err := getAddressByName(lbaas.compute, host)
if err != nil {
return err
}
addrs[addr] = empty{}
}
// Check for adding/removing members associated with each port
for _, port := range ports {
// Get listener associated with this port
listener, ok := lbListeners[portKey{
Protocol: string(port.Protocol),
Port: int(port.Port),
}]
if !ok {
return fmt.Errorf("Loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
}
// Get pool associated with this listener
pool, ok := lbPools[listener.ID]
if !ok {
return fmt.Errorf("Loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
}
// Find existing pool members (by address) for this port
members := make(map[string]v2_pools.Member)
err := v2_pools.ListAssociateMembers(lbaas.network, pool.ID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range membersList {
members[member.Address] = member
}
return true, nil
})
if err != nil {
return err
}
// Add any new members for this port
for addr := range addrs {
if _, ok := members[addr]; ok {
// Already exists, do not create member
continue
}
_, err := v2_pools.CreateAssociateMember(lbaas.network, pool.ID, v2_pools.MemberCreateOpts{
Address: addr,
ProtocolPort: int(port.NodePort),
SubnetID: lbaas.opts.SubnetId,
}).Extract()
if err != nil {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// Remove any old members for this port
for _, member := range members {
if _, ok := addrs[member.Address]; ok {
// Still present, do not delete member
continue
}
err = v2_pools.DeleteMember(lbaas.network, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
}
return nil
}
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(clusterName string, service *api.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)
loadbalancer, err := getLoadbalancerByName(lbaas.network, loadBalancerName)
if err != nil && err != ErrNotFound {
return err
}
if loadbalancer == nil {
return nil
}
if lbaas.opts.FloatingNetworkId != "" && loadbalancer != nil {
portID, err := getPortIDByIP(lbaas.network, loadbalancer.VipAddress)
if err != nil {
return err
}
floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
if err != nil && err != ErrNotFound {
return err
}
if floatingIP != nil {
err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
}
}
// get all listeners associated with this loadbalancer
var listenerIDs []string
err = listeners.List(lbaas.network, listeners.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
listenerList, err := listeners.ExtractListeners(page)
if err != nil {
return false, err
}
for _, listener := range listenerList {
listenerIDs = append(listenerIDs, listener.ID)
}
return true, nil
})
if err != nil {
return err
}
// get all pools associated with this loadbalancer
var poolIDs []string
err = v2_pools.List(lbaas.network, v2_pools.ListOpts{LoadbalancerID: loadbalancer.ID}).EachPage(func(page pagination.Page) (bool, error) {
poolsList, err := v2_pools.ExtractPools(page)
if err != nil {
return false, err
}
for _, pool := range poolsList {
poolIDs = append(poolIDs, pool.ID)
}
return true, nil
})
if err != nil {
return err
}
// get all members associated with each poolIDs
var memberIDs []string
for _, poolID := range poolIDs {
err := v2_pools.ListAssociateMembers(lbaas.network, poolID, v2_pools.MemberListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2_pools.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range membersList {
memberIDs = append(memberIDs, member.ID)
}
return true, nil
})
if err != nil {
return err
}
}
// get all monitors associated with each poolIDs
var monitorIDs []string
for _, poolID := range poolIDs {
err = v2_monitors.List(lbaas.network, v2_monitors.ListOpts{PoolID: poolID}).EachPage(func(page pagination.Page) (bool, error) {
monitorsList, err := v2_monitors.ExtractMonitors(page)
if err != nil {
return false, err
}
for _, monitor := range monitorsList {
monitorIDs = append(monitorIDs, monitor.ID)
}
return true, nil
})
if err != nil {
return err
}
}
// delete all monitors
for _, monitorID := range monitorIDs {
err := v2_monitors.Delete(lbaas.network, monitorID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// delete all members and pools
for _, poolID := range poolIDs {
// delete all members for this pool
for _, memberID := range memberIDs {
err := v2_pools.DeleteMember(lbaas.network, poolID, memberID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// delete pool
err := v2_pools.Delete(lbaas.network, poolID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// delete all listeners
for _, listenerID := range listenerIDs {
err := listeners.Delete(lbaas.network, listenerID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
}
// delete loadbalancer
err = loadbalancers.Delete(lbaas.network, loadbalancer.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
waitLoadbalancerDeleted(lbaas.network, loadbalancer.ID)
return nil
}
func (lb *LbaasV1) GetLoadBalancer(clusterName string, service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
vip, err := getVipByName(lb.network, loadBalancerName)
if err == ErrNotFound {
return nil, false, nil
}
if vip == nil {
return nil, false, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
return status, true, err
}
// TODO: This code currently ignores 'region' and always creates a
// loadbalancer in only the current OpenStack region. We should take
// a list of regions (from config) and query/create loadbalancers in
// each region.
func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *api.Service, hosts []string) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, apiService.Annotations)
ports := apiService.Spec.Ports
if len(ports) > 1 {
return nil, fmt.Errorf("multiple ports are not supported in openstack v1 load balancers")
} else if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
// TODO: Convert all error messages to use an event recorder
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
affinity := apiService.Spec.SessionAffinity
var persistence *vips.SessionPersistence
switch affinity {
case api.ServiceAffinityNone:
persistence = nil
case api.ServiceAffinityClientIP:
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
}
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
if err != nil {
return nil, err
}
if !service.IsAllowAll(sourceRanges) {
return nil, fmt.Errorf("Source range restrictions are not supported for openstack load balancers")
}
glog.V(2).Infof("Checking if openstack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
_, exists, err := lb.GetLoadBalancer(clusterName, apiService)
if err != nil {
return nil, fmt.Errorf("error checking if openstack load balancer already exists: %v", err)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
// In particular, if we implement hosts update, we can get rid of UpdateHosts
if exists {
err := lb.EnsureLoadBalancerDeleted(clusterName, apiService)
if err != nil {
return nil, fmt.Errorf("error deleting existing openstack load balancer: %v", err)
}
}
lbmethod := lb.opts.LBMethod
if lbmethod == "" {
lbmethod = pools.LBMethodRoundRobin
}
name := cloudprovider.GetLoadBalancerName(apiService)
pool, err := pools.Create(lb.network, pools.CreateOpts{
Name: name,
Protocol: pools.ProtocolTCP,
SubnetID: lb.opts.SubnetId,
LBMethod: lbmethod,
}).Extract()
if err != nil {
return nil, err
}
for _, host := range hosts {
addr, err := getAddressByName(lb.compute, host)
if err != nil {
return nil, err
}
_, err = members.Create(lb.network, members.CreateOpts{
PoolID: pool.ID,
ProtocolPort: int(ports[0].NodePort), //Note: only handles single port
Address: addr,
}).Extract()
if err != nil {
pools.Delete(lb.network, pool.ID)
return nil, err
}
}
var mon *monitors.Monitor
if lb.opts.CreateMonitor {
mon, err = monitors.Create(lb.network, monitors.CreateOpts{
Type: monitors.TypeTCP,
Delay: int(lb.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lb.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
pools.Delete(lb.network, pool.ID)
return nil, err
}
_, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract()
if err != nil {
monitors.Delete(lb.network, mon.ID)
pools.Delete(lb.network, pool.ID)
return nil, err
}
}
createOpts := vips.CreateOpts{
Name: name,
Description: fmt.Sprintf("Kubernetes external service %s", name),
Protocol: "TCP",
ProtocolPort: int(ports[0].Port), //TODO: need to handle multi-port
PoolID: pool.ID,
SubnetID: lb.opts.SubnetId,
Persistence: persistence,
}
loadBalancerIP := apiService.Spec.LoadBalancerIP
if loadBalancerIP != "" {
createOpts.Address = loadBalancerIP
}
vip, err := vips.Create(lb.network, createOpts).Extract()
if err != nil {
if mon != nil {
monitors.Delete(lb.network, mon.ID)
}
pools.Delete(lb.network, pool.ID)
return nil, err
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: vip.Address}}
if lb.opts.FloatingNetworkId != "" {
floatIPOpts := floatingips.CreateOpts{
FloatingNetworkID: lb.opts.FloatingNetworkId,
PortID: vip.PortID,
}
floatIP, err := floatingips.Create(lb.network, floatIPOpts).Extract()
if err != nil {
return nil, err
}
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: floatIP.FloatingIP})
}
return status, nil
}
func (lb *LbaasV1) UpdateLoadBalancer(clusterName string, service *api.Service, hosts []string) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, hosts)
vip, err := getVipByName(lb.network, loadBalancerName)
if err != nil {
return err
}
// Set of member (addresses) that _should_ exist
addrs := map[string]bool{}
for _, host := range hosts {
addr, err := getAddressByName(lb.compute, host)
if err != nil {
return err
}
addrs[addr] = true
}
// Iterate over members that _do_ exist
pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID})
err = pager.EachPage(func(page pagination.Page) (bool, error) {
memList, err := members.ExtractMembers(page)
if err != nil {
return false, err
}
for _, member := range memList {
if _, found := addrs[member.Address]; found {
// Member already exists
delete(addrs, member.Address)
} else {
// Member needs to be deleted
err = members.Delete(lb.network, member.ID).ExtractErr()
if err != nil {
return false, err
}
}
}
return true, nil
})
if err != nil {
return err
}
// Anything left in addrs is a new member that needs to be added
for addr := range addrs {
_, err := members.Create(lb.network, members.CreateOpts{
PoolID: vip.PoolID,
Address: addr,
ProtocolPort: vip.ProtocolPort,
}).Extract()
if err != nil {
return err
}
}
return nil
}
func (lb *LbaasV1) EnsureLoadBalancerDeleted(clusterName string, service *api.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)
vip, err := getVipByName(lb.network, loadBalancerName)
if err != nil && err != ErrNotFound {
return err
}
if lb.opts.FloatingNetworkId != "" && vip != nil {
floatingIP, err := getFloatingIPByPortID(lb.network, vip.PortID)
if err != nil && !isNotFound(err) {
return err
}
if floatingIP != nil {
err = floatingips.Delete(lb.network, floatingIP.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
}
}
// We have to delete the VIP before the pool can be deleted,
// so no point continuing if this fails.
if vip != nil {
err := vips.Delete(lb.network, vip.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
}
var pool *pools.Pool
if vip != nil {
pool, err = pools.Get(lb.network, vip.PoolID).Extract()
if err != nil && !isNotFound(err) {
return err
}
} else {
// The VIP is gone, but it is conceivable that a Pool
// still exists that we failed to delete on some
// previous occasion. Make a best effort attempt to
// cleanup any pools with the same name as the VIP.
pool, err = getPoolByName(lb.network, service.Name)
if err != nil && err != ErrNotFound {
return err
}
}
if pool != nil {
for _, monId := range pool.MonitorIDs {
_, err = pools.DisassociateMonitor(lb.network, pool.ID, monId).Extract()
if err != nil {
return err
}
err = monitors.Delete(lb.network, monId).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
}
err = pools.Delete(lb.network, pool.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
}
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.4.6-beta.0

搜索帮助