1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
rest.go 23.98 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package service
import (
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"strconv"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
utilfeature "k8s.io/apiserver/pkg/util/feature"
apiservice "k8s.io/kubernetes/pkg/api/service"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
)
// ServiceRest includes storage for services and all sub resources
type ServiceRest struct {
Service *REST
Proxy *ProxyREST
}
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
registry Registry
endpoints endpoint.Registry
serviceIPs ipallocator.Interface
serviceNodePorts portallocator.Interface
proxyTransport http.RoundTripper
}
// ServiceNodePort includes protocol and port number of a service NodePort.
type ServiceNodePort struct {
// The IP protocol for this port. Supports "TCP" and "UDP".
Protocol api.Protocol
// The port on each node on which this service is exposed.
// Default is to auto-allocate a port if the ServiceType of this Service requires one.
NodePort int32
}
// NewStorage returns a new REST.
func NewStorage(registry Registry, endpoints endpoint.Registry, serviceIPs ipallocator.Interface,
serviceNodePorts portallocator.Interface, proxyTransport http.RoundTripper) *ServiceRest {
rest := &REST{
registry: registry,
endpoints: endpoints,
serviceIPs: serviceIPs,
serviceNodePorts: serviceNodePorts,
proxyTransport: proxyTransport,
}
return &ServiceRest{
Service: rest,
Proxy: &ProxyREST{ServiceRest: rest, ProxyTransport: proxyTransport},
}
}
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
func (rs *REST) ShortNames() []string {
return []string{"svc"}
}
// Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
func (rs *REST) Categories() []string {
return []string{"all"}
}
// TODO: implement includeUninitialized by refactoring this to move to store
func (rs *REST) Create(ctx genericapirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) {
service := obj.(*api.Service)
if err := rest.BeforeCreate(Strategy, ctx, obj); err != nil {
return nil, err
}
// TODO: this should probably move to strategy.PrepareForCreate()
releaseServiceIP := false
defer func() {
if releaseServiceIP {
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
var err error
if service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
return nil, err
}
}
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts)
defer nodePortOp.Finish()
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := rs.initNodePorts(service, nodePortOp); err != nil {
return nil, err
}
}
// Handle ExternalTraiffc related fields during service creation.
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
if apiservice.NeedsHealthCheck(service) {
if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil {
return nil, errors.NewInternalError(err)
}
}
if errs := validation.ValidateServiceExternalTrafficFieldsCombination(service); len(errs) > 0 {
return nil, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
}
out, err := rs.registry.CreateService(ctx, service, createValidation)
if err != nil {
err = rest.CheckGeneratedNameError(Strategy, err, service)
}
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// these should be caught by an eventual reconciliation / restart
glog.Errorf("error(s) committing service node-ports changes: %v", el)
}
releaseServiceIP = false
}
return out, err
}
func (rs *REST) Delete(ctx genericapirequest.Context, id string) (runtime.Object, error) {
service, err := rs.registry.GetService(ctx, id, &metav1.GetOptions{})
if err != nil {
return nil, err
}
err = rs.registry.DeleteService(ctx, id)
if err != nil {
return nil, err
}
// TODO: can leave dangling endpoints, and potentially return incorrect
// endpoints if a new service is created with the same name
err = rs.endpoints.DeleteEndpoints(ctx, id)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
}
for _, nodePort := range CollectServiceNodePorts(service) {
err := rs.serviceNodePorts.Release(nodePort)
if err != nil {
// these should be caught by an eventual reconciliation / restart
glog.Errorf("Error releasing service %s node port %d: %v", service.Name, nodePort, err)
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
apiservice.NeedsHealthCheck(service) {
nodePort := service.Spec.HealthCheckNodePort
if nodePort > 0 {
err := rs.serviceNodePorts.Release(int(nodePort))
if err != nil {
// these should be caught by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", service.Name, nodePort, err))
}
}
}
return &metav1.Status{Status: metav1.StatusSuccess}, nil
}
func (rs *REST) Get(ctx genericapirequest.Context, id string, options *metav1.GetOptions) (runtime.Object, error) {
return rs.registry.GetService(ctx, id, options)
}
func (rs *REST) List(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
return rs.registry.ListServices(ctx, options)
}
// Watch returns Services events via a watch.Interface.
// It implements rest.Watcher.
func (rs *REST) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
return rs.registry.WatchServices(ctx, options)
}
// Export returns Service stripped of cluster-specific information.
// It implements rest.Exporter.
func (rs *REST) Export(ctx genericapirequest.Context, name string, opts metav1.ExportOptions) (runtime.Object, error) {
return rs.registry.ExportService(ctx, name, opts)
}
func (*REST) New() runtime.Object {
return &api.Service{}
}
func (*REST) NewList() runtime.Object {
return &api.ServiceList{}
}
// externalTrafficPolicyUpdate adjusts ExternalTrafficPolicy during service update if needed.
// It is necessary because we default ExternalTrafficPolicy field to different values.
// (NodePort / LoadBalancer: default is Global; Other types: default is empty.)
func externalTrafficPolicyUpdate(oldService, service *api.Service) {
var neededExternalTraffic, needsExternalTraffic bool
if oldService.Spec.Type == api.ServiceTypeNodePort ||
oldService.Spec.Type == api.ServiceTypeLoadBalancer {
neededExternalTraffic = true
}
if service.Spec.Type == api.ServiceTypeNodePort ||
service.Spec.Type == api.ServiceTypeLoadBalancer {
needsExternalTraffic = true
}
if neededExternalTraffic && !needsExternalTraffic {
// Clear ExternalTrafficPolicy to prevent confusion from ineffective field.
service.Spec.ExternalTrafficPolicy = api.ServiceExternalTrafficPolicyType("")
}
}
// healthCheckNodePortUpdate handles HealthCheckNodePort allocation/release
// and adjusts HealthCheckNodePort during service update if needed.
func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service, nodePortOp *portallocator.PortAllocationOperation) (bool, error) {
neededHealthCheckNodePort := apiservice.NeedsHealthCheck(oldService)
oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort
needsHealthCheckNodePort := apiservice.NeedsHealthCheck(service)
newHealthCheckNodePort := service.Spec.HealthCheckNodePort
switch {
// Case 1: Transition from don't need HealthCheckNodePort to needs HealthCheckNodePort.
// Allocate a health check node port or attempt to reserve the user-specified one if provided.
// Insert health check node port into the service's HealthCheckNodePort field if needed.
case !neededHealthCheckNodePort && needsHealthCheckNodePort:
glog.Infof("Transition to LoadBalancer type service with ExternalTrafficPolicy=Local")
if err := rs.allocateHealthCheckNodePort(service, nodePortOp); err != nil {
return false, errors.NewInternalError(err)
}
// Case 2: Transition from needs HealthCheckNodePort to don't need HealthCheckNodePort.
// Free the existing healthCheckNodePort and clear the HealthCheckNodePort field.
case neededHealthCheckNodePort && !needsHealthCheckNodePort:
glog.Infof("Transition to non LoadBalancer type service or LoadBalancer type service with ExternalTrafficPolicy=Global")
glog.V(4).Infof("Releasing healthCheckNodePort: %d", oldHealthCheckNodePort)
nodePortOp.ReleaseDeferred(int(oldHealthCheckNodePort))
// Clear the HealthCheckNodePort field.
service.Spec.HealthCheckNodePort = 0
// Case 3: Remain in needs HealthCheckNodePort.
// Reject changing the value of the HealthCheckNodePort field.
case neededHealthCheckNodePort && needsHealthCheckNodePort:
if oldHealthCheckNodePort != newHealthCheckNodePort {
glog.Warningf("Attempt to change value of health check node port DENIED")
fldPath := field.NewPath("spec", "healthCheckNodePort")
el := field.ErrorList{field.Invalid(fldPath, newHealthCheckNodePort,
"cannot change healthCheckNodePort on loadBalancer service with externalTraffic=Local during update")}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
return true, nil
}
func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc) (runtime.Object, bool, error) {
oldService, err := rs.registry.GetService(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
}
obj, err := objInfo.UpdatedObject(ctx, oldService)
if err != nil {
return nil, false, err
}
service := obj.(*api.Service)
if !rest.ValidNamespace(ctx, &service.ObjectMeta) {
return nil, false, errors.NewConflict(api.Resource("services"), service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context"))
}
// Copy over non-user fields
// TODO: make this a merge function
if errs := validation.ValidateServiceUpdate(service, oldService); len(errs) > 0 {
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
// TODO: this should probably move to strategy.PrepareForCreate()
releaseServiceIP := false
defer func() {
if releaseServiceIP {
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts)
defer nodePortOp.Finish()
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
return nil, false, err
}
}
// Update service from non-ExternalName to ExternalName, should release ClusterIP if exists.
if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
if helper.IsServiceIPSet(oldService) {
rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP))
}
}
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
rs.releaseNodePorts(oldService, nodePortOp)
}
// Update service from any type to NodePort or LoadBalancer, should update NodePort.
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := rs.updateNodePorts(oldService, service, nodePortOp); err != nil {
return nil, false, err
}
}
// Update service from LoadBalancer to non-LoadBalancer, should remove any LoadBalancerStatus.
if service.Spec.Type != api.ServiceTypeLoadBalancer {
// Although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity.
service.Status.LoadBalancer = api.LoadBalancerStatus{}
}
// Handle ExternalTraiffc related updates.
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
success, err := rs.healthCheckNodePortUpdate(oldService, service, nodePortOp)
if !success || err != nil {
return nil, false, err
}
externalTrafficPolicyUpdate(oldService, service)
if errs := validation.ValidateServiceExternalTrafficFieldsCombination(service); len(errs) > 0 {
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
}
out, err := rs.registry.UpdateService(ctx, service, createValidation, updateValidation)
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// problems should be fixed by an eventual reconciliation / restart
glog.Errorf("error(s) committing NodePorts changes: %v", el)
}
releaseServiceIP = false
}
return out, false, err
}
// Implement Redirector.
var _ = rest.Redirector(&REST{})
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx genericapirequest.Context, id string) (*url.URL, http.RoundTripper, error) {
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
if !valid {
return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
// If a port *number* was specified, find the corresponding service port name
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
svc, err := rs.registry.GetService(ctx, svcName, &metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
found := false
for _, svcPort := range svc.Spec.Ports {
if int64(svcPort.Port) == portNum {
// use the declared port's name
portStr = svcPort.Name
found = true
break
}
}
if !found {
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
}
}
eps, err := rs.endpoints.GetEndpoints(ctx, svcName, &metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
if len(eps.Subsets) == 0 {
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
}
// Pick a random Subset to start searching from.
ssSeed := rand.Intn(len(eps.Subsets))
// Find a Subset that has the port.
for ssi := 0; ssi < len(eps.Subsets); ssi++ {
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
if len(ss.Addresses) == 0 {
continue
}
for i := range ss.Ports {
if ss.Ports[i].Name == portStr {
// Pick a random address.
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
port := int(ss.Ports[i].Port)
return &url.URL{
Scheme: svcScheme,
Host: net.JoinHostPort(ip, strconv.Itoa(port)),
}, rs.proxyTransport, nil
}
}
}
return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}
// This is O(N), but we expect haystack to be small;
// so small that we expect a linear search to be faster
func containsNumber(haystack []int, needle int) bool {
for _, v := range haystack {
if v == needle {
return true
}
}
return false
}
// This is O(N), but we expect serviceNodePorts to be small;
// so small that we expect a linear search to be faster
func containsNodePort(serviceNodePorts []ServiceNodePort, serviceNodePort ServiceNodePort) bool {
for _, snp := range serviceNodePorts {
if snp == serviceNodePort {
return true
}
}
return false
}
func CollectServiceNodePorts(service *api.Service) []int {
servicePorts := []int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if servicePort.NodePort != 0 {
servicePorts = append(servicePorts, int(servicePort.NodePort))
}
}
return servicePorts
}
// Loop through the service ports list, find one with the same port number and
// NodePort specified, return this NodePort otherwise return 0.
func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
for i := range servicePorts {
servicePort := servicePorts[i]
if port == int(servicePort.Port) && servicePort.NodePort != 0 {
return int(servicePort.NodePort)
}
}
return 0
}
// allocateHealthCheckNodePort allocates health check node port to service.
func (rs *REST) allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
healthCheckNodePort := service.Spec.HealthCheckNodePort
if healthCheckNodePort != 0 {
// If the request has a health check nodePort in mind, attempt to reserve it.
err := nodePortOp.Allocate(int(healthCheckNodePort))
if err != nil {
return fmt.Errorf("failed to allocate requested HealthCheck NodePort %v: %v",
healthCheckNodePort, err)
}
glog.V(4).Infof("Reserved user requested healthCheckNodePort: %d", healthCheckNodePort)
} else {
// If the request has no health check nodePort specified, allocate any.
healthCheckNodePort, err := nodePortOp.AllocateNext()
if err != nil {
return fmt.Errorf("failed to allocate a HealthCheck NodePort %v: %v", healthCheckNodePort, err)
}
service.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
glog.V(4).Infof("Reserved allocated healthCheckNodePort: %d", healthCheckNodePort)
}
return nil
}
// The return bool value indicates if a cluster IP is allocated successfully.
func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
switch {
case service.Spec.ClusterIP == "":
// Allocate next available.
ip, err := rs.serviceIPs.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return false, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err))
}
service.Spec.ClusterIP = ip.String()
return true, nil
case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "":
// Try to respect the requested IP.
if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
return true, nil
}
return false, nil
}
func (rs *REST) initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
svcPortToNodePort := map[int]int{}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
allocatedNodePort := svcPortToNodePort[int(servicePort.Port)]
if allocatedNodePort == 0 {
// This will only scan forward in the service.Spec.Ports list because any matches
// before the current port would have been found in svcPortToNodePort. This is really
// looking for any user provided values.
np := findRequestedNodePort(int(servicePort.Port), service.Spec.Ports)
if np != 0 {
err := nodePortOp.Allocate(np)
if err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), np, err.Error())}
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
servicePort.NodePort = int32(np)
svcPortToNodePort[int(servicePort.Port)] = np
} else {
nodePort, err := nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePort)
svcPortToNodePort[int(servicePort.Port)] = nodePort
}
} else if int(servicePort.NodePort) != allocatedNodePort {
// TODO(xiangpengzhao): do we need to allocate a new NodePort in this case?
// Note: the current implementation is better, because it saves a NodePort.
if servicePort.NodePort == 0 {
servicePort.NodePort = int32(allocatedNodePort)
} else {
err := nodePortOp.Allocate(int(servicePort.NodePort))
if err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())}
return errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
}
}
return nil
}
func (rs *REST) updateNodePorts(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
oldNodePortsNumbers := CollectServiceNodePorts(oldService)
newNodePorts := []ServiceNodePort{}
portAllocated := map[int]bool{}
for i := range newService.Spec.Ports {
servicePort := &newService.Spec.Ports[i]
nodePort := ServiceNodePort{Protocol: servicePort.Protocol, NodePort: servicePort.NodePort}
if nodePort.NodePort != 0 {
if !containsNumber(oldNodePortsNumbers, int(nodePort.NodePort)) && !portAllocated[int(nodePort.NodePort)] {
err := nodePortOp.Allocate(int(nodePort.NodePort))
if err != nil {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort.NodePort, err.Error())}
return errors.NewInvalid(api.Kind("Service"), newService.Name, el)
}
portAllocated[int(nodePort.NodePort)] = true
}
} else {
nodePortNumber, err := nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePortNumber)
nodePort.NodePort = servicePort.NodePort
}
if containsNodePort(newNodePorts, nodePort) {
return fmt.Errorf("duplicate nodePort: %v", nodePort)
}
newNodePorts = append(newNodePorts, nodePort)
}
newNodePortsNumbers := CollectServiceNodePorts(newService)
// The comparison loops are O(N^2), but we don't expect N to be huge
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
for _, oldNodePortNumber := range oldNodePortsNumbers {
if containsNumber(newNodePortsNumbers, oldNodePortNumber) {
continue
}
nodePortOp.ReleaseDeferred(int(oldNodePortNumber))
}
return nil
}
func (rs *REST) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
nodePorts := CollectServiceNodePorts(service)
for _, nodePort := range nodePorts {
nodePortOp.ReleaseDeferred(nodePort)
}
}
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.9.1

搜索帮助