1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
service_util.go 52.22 KB
一键复制 编辑 原始数据 按行查看 历史
Maciej Szulik 提交于 2018-05-17 17:27 . Remove kubectl reapers
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"bytes"
"fmt"
"net"
"sort"
"strconv"
"strings"
"time"
"k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
azurecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
// KubeProxyLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
// to not notice a Service update, such as type=NodePort.
// TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
// liberal. Fix tracked in #20567.
KubeProxyLagTimeout = 5 * time.Minute
// KubeProxyEndpointLagTimeout is the maximum time a kube-proxy daemon on a node is allowed
// to not notice an Endpoint update.
KubeProxyEndpointLagTimeout = 30 * time.Second
// LoadBalancerLagTimeoutDefault is the maximum time a load balancer is allowed to
// not respond after creation.
LoadBalancerLagTimeoutDefault = 2 * time.Minute
// LoadBalancerLagTimeoutAWS is the delay between ELB creation and serving traffic
// on AWS. A few minutes is typical, so use 10m.
LoadBalancerLagTimeoutAWS = 10 * time.Minute
// How long to wait for a load balancer to be created/modified.
//TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
LoadBalancerCreateTimeoutDefault = 20 * time.Minute
LoadBalancerCreateTimeoutLarge = 2 * time.Hour
// Time required by the loadbalancer to cleanup, proportional to numApps/Ing.
// Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
LoadBalancerCleanupTimeout = 15 * time.Minute
// On average it takes ~6 minutes for a single backend to come online in GCE.
LoadBalancerPollTimeout = 15 * time.Minute
LoadBalancerPollInterval = 30 * time.Second
LargeClusterMinNodesNumber = 100
// Don't test with more than 3 nodes.
// Many tests create an endpoint per node, in large clusters, this is
// resource and time intensive.
MaxNodesForEndpointsTests = 3
// ServiceTestTimeout is used for most polling/waiting activities
ServiceTestTimeout = 60 * time.Second
// GCPMaxInstancesInInstanceGroup is the maximum number of instances supported in
// one instance group on GCP.
GCPMaxInstancesInInstanceGroup = 2000
// AffinityConfirmCount is the number of needed continuous requests to confirm that
// affinity is enabled.
AffinityConfirmCount = 15
)
// This should match whatever the default/configured range is
var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// A test jig to help service testing.
type ServiceTestJig struct {
ID string
Name string
Client clientset.Interface
Labels map[string]string
}
// NewServiceTestJig allocates and inits a new ServiceTestJig.
func NewServiceTestJig(client clientset.Interface, name string) *ServiceTestJig {
j := &ServiceTestJig{}
j.Client = client
j.Name = name
j.ID = j.Name + "-" + string(uuid.NewUUID())
j.Labels = map[string]string{"testid": j.ID}
return j
}
// newServiceTemplate returns the default v1.Service template for this jig, but
// does not actually create the Service. The default Service has the same name
// as the jig and exposes the given port.
func (j *ServiceTestJig) newServiceTemplate(namespace string, proto v1.Protocol, port int32) *v1.Service {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ServiceSpec{
Selector: j.Labels,
Ports: []v1.ServicePort{
{
Protocol: proto,
Port: port,
},
},
},
}
return service
}
// CreateTCPServiceWithPort creates a new TCP Service with given port based on the
// jig's defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceWithPort(namespace string, tweak func(svc *v1.Service), port int32) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, port)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create TCP Service %q: %v", svc.Name, err)
}
return result
}
// CreateTCPServiceOrFail creates a new TCP Service based on the jig's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolTCP, 80)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create TCP Service %q: %v", svc.Name, err)
}
return result
}
// CreateUDPServiceOrFail creates a new UDP Service based on the jig's
// defaults. Callers can provide a function to tweak the Service object before
// it is created.
func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := j.newServiceTemplate(namespace, v1.ProtocolUDP, 80)
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create UDP Service %q: %v", svc.Name, err)
}
return result
}
// CreateExternalNameServiceOrFail creates a new ExternalName type Service based on the jig's defaults.
// Callers can provide a function to tweak the Service object before it is created.
func (j *ServiceTestJig) CreateExternalNameServiceOrFail(namespace string, tweak func(svc *v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ServiceSpec{
Selector: j.Labels,
ExternalName: "foo.example.com",
Type: v1.ServiceTypeExternalName,
},
}
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(namespace).Create(svc)
if err != nil {
Failf("Failed to create ExternalName Service %q: %v", svc.Name, err)
}
return result
}
// CreateServiceWithServicePort creates a new Service with ServicePort.
func (j *ServiceTestJig) CreateServiceWithServicePort(labels map[string]string, namespace string, ports []v1.ServicePort) (*v1.Service, error) {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: j.Name,
},
Spec: v1.ServiceSpec{
Selector: labels,
Ports: ports,
},
}
return j.Client.CoreV1().Services(namespace).Create(service)
}
func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.ServiceType, timeout time.Duration) {
ingressIP := ""
svc := j.UpdateServiceOrFail(namespace, name, func(s *v1.Service) {
for _, ing := range s.Status.LoadBalancer.Ingress {
if ing.IP != "" {
ingressIP = ing.IP
}
}
s.Spec.Type = newType
s.Spec.Ports[0].NodePort = 0
})
if ingressIP != "" {
j.WaitForLoadBalancerDestroyOrFail(namespace, svc.Name, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
}
}
// CreateOnlyLocalNodePortService creates a NodePort service with
// ExternalTrafficPolicy set to Local and sanity checks its nodePort.
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: 80}}
})
if createPod {
By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
}
j.SanityCheckService(svc, v1.ServiceTypeNodePort)
return svc
}
// CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
// ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
tweak func(svc *v1.Service)) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
if tweak != nil {
tweak(svc)
}
})
if createPod {
By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
}
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
}
// CreateLoadBalancerService creates a loadbalancer service and waits
// for it to acquire an ingress IP.
func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
if tweak != nil {
tweak(svc)
}
})
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
}
func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
for j := range node.Status.Addresses {
nodeAddress := &node.Status.Addresses[j]
if nodeAddress.Type == addressType {
ips = append(ips, nodeAddress.Address)
}
}
return
}
func CollectAddresses(nodes *v1.NodeList, addressType v1.NodeAddressType) []string {
ips := []string{}
for i := range nodes.Items {
ips = append(ips, GetNodeAddresses(&nodes.Items[i], addressType)...)
}
return ips
}
func GetNodePublicIps(c clientset.Interface) ([]string, error) {
nodes := GetReadySchedulableNodesOrDie(c)
ips := CollectAddresses(nodes, v1.NodeExternalIP)
if len(ips) == 0 {
// If ExternalIP isn't set, assume the test programs can reach the InternalIP
ips = CollectAddresses(nodes, v1.NodeInternalIP)
}
return ips, nil
}
func PickNodeIP(c clientset.Interface) string {
publicIps, err := GetNodePublicIps(c)
Expect(err).NotTo(HaveOccurred())
if len(publicIps) == 0 {
Failf("got unexpected number (%d) of public IPs", len(publicIps))
}
ip := publicIps[0]
return ip
}
// GetEndpointNodes returns a map of nodenames:external-ip on which the
// endpoints of the given Service are running.
func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
nodes := j.GetNodes(MaxNodesForEndpointsTests)
endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
if err != nil {
Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
}
if len(endpoints.Subsets) == 0 {
Failf("Endpoint has no subsets, cannot determine node addresses.")
}
epNodes := sets.NewString()
for _, ss := range endpoints.Subsets {
for _, e := range ss.Addresses {
if e.NodeName != nil {
epNodes.Insert(*e.NodeName)
}
}
}
nodeMap := map[string][]string{}
for _, n := range nodes.Items {
if epNodes.Has(n.Name) {
nodeMap[n.Name] = GetNodeAddresses(&n, v1.NodeExternalIP)
}
}
return nodeMap
}
// getNodes returns the first maxNodesForTest nodes. Useful in large clusters
// where we don't eg: want to create an endpoint per node.
func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
nodes = GetReadySchedulableNodesOrDie(j.Client)
if len(nodes.Items) <= maxNodesForTest {
maxNodesForTest = len(nodes.Items)
}
nodes.Items = nodes.Items[:maxNodesForTest]
return nodes
}
func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string {
nodes := j.GetNodes(maxNodesForTest)
nodesNames := []string{}
for _, node := range nodes.Items {
nodesNames = append(nodesNames, node.Name)
}
return nodesNames
}
func (j *ServiceTestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
err := wait.PollImmediate(Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
endpoints, err := j.Client.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
if err != nil {
Logf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
return false, nil
}
if len(endpoints.Subsets) == 0 {
Logf("Expect endpoints with subsets, got none.")
return false, nil
}
// TODO: Handle multiple endpoints
if len(endpoints.Subsets[0].Addresses) == 0 {
Logf("Expected Ready endpoints - found none")
return false, nil
}
epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
Logf("Pod for service %s/%s is on node %s", namespace, serviceName, epHostName)
if epHostName != nodeName {
Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
return false, nil
}
return true, nil
})
ExpectNoError(err)
}
func (j *ServiceTestJig) SanityCheckService(svc *v1.Service, svcType v1.ServiceType) {
if svc.Spec.Type != svcType {
Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
}
if svcType != v1.ServiceTypeExternalName {
if svc.Spec.ExternalName != "" {
Failf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
}
if svc.Spec.ClusterIP != api.ClusterIPNone && svc.Spec.ClusterIP == "" {
Failf("didn't get ClusterIP for non-ExternamName service")
}
} else {
if svc.Spec.ClusterIP != "" {
Failf("unexpected Spec.ClusterIP (%s) for ExternamName service, expected empty", svc.Spec.ClusterIP)
}
}
expectNodePorts := false
if svcType != v1.ServiceTypeClusterIP && svcType != v1.ServiceTypeExternalName {
expectNodePorts = true
}
for i, port := range svc.Spec.Ports {
hasNodePort := (port.NodePort != 0)
if hasNodePort != expectNodePorts {
Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
}
if hasNodePort {
if !ServiceNodePortRange.Contains(int(port.NodePort)) {
Failf("out-of-range nodePort (%d) for service", port.NodePort)
}
}
}
expectIngress := false
if svcType == v1.ServiceTypeLoadBalancer {
expectIngress = true
}
hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
if hasIngress != expectIngress {
Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
}
if hasIngress {
for i, ing := range svc.Status.LoadBalancer.Ingress {
if ing.IP == "" && ing.Hostname == "" {
Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
}
}
}
}
// UpdateService fetches a service, calls the update function on it, and
// then attempts to send the updated service. It tries up to 3 times in the
// face of timeouts and conflicts.
func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*v1.Service)) (*v1.Service, error) {
for i := 0; i < 3; i++ {
service, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Failed to get Service %q: %v", name, err)
}
update(service)
service, err = j.Client.CoreV1().Services(namespace).Update(service)
if err == nil {
return service, nil
}
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
return nil, fmt.Errorf("Failed to update Service %q: %v", name, err)
}
}
return nil, fmt.Errorf("Too many retries updating Service %q", name)
}
// UpdateServiceOrFail fetches a service, calls the update function on it, and
// then attempts to send the updated service. It tries up to 3 times in the
// face of timeouts and conflicts.
func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*v1.Service)) *v1.Service {
svc, err := j.UpdateService(namespace, name, update)
if err != nil {
Failf(err.Error())
}
return svc
}
func (j *ServiceTestJig) WaitForNewIngressIPOrFail(namespace, name, existingIP string, timeout time.Duration) *v1.Service {
Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
if len(svc.Status.LoadBalancer.Ingress) == 0 {
return false
}
ip := svc.Status.LoadBalancer.Ingress[0].IP
if ip == "" || ip == existingIP {
return false
}
return true
})
return service
}
func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *v1.Service {
var err error
var service *v1.Service
for i := 1; i < ServiceNodePortRange.Size; i++ {
offs1 := initial - ServiceNodePortRange.Base
offs2 := (offs1 + i) % ServiceNodePortRange.Size
newPort := ServiceNodePortRange.Base + offs2
service, err = j.UpdateService(namespace, name, func(s *v1.Service) {
s.Spec.Ports[0].NodePort = int32(newPort)
})
if err != nil && strings.Contains(err.Error(), "provided port is already allocated") {
Logf("tried nodePort %d, but it is in use, will try another", newPort)
continue
}
// Otherwise err was nil or err was a real error
break
}
if err != nil {
Failf("Could not change the nodePort: %v", err)
}
return service
}
func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *v1.Service {
Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have a load balancer", func(svc *v1.Service) bool {
if len(svc.Status.LoadBalancer.Ingress) > 0 {
return true
}
return false
})
return service
}
func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *v1.Service {
// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
defer func() {
if err := EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
}
}()
Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
service := j.waitForConditionOrFail(namespace, name, timeout, "have no load balancer", func(svc *v1.Service) bool {
if len(svc.Status.LoadBalancer.Ingress) == 0 {
return true
}
return false
})
return service
}
func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) *v1.Service {
var service *v1.Service
pollFunc := func() (bool, error) {
svc, err := j.Client.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if conditionFn(svc) {
service = svc
return true, nil
}
return false, nil
}
if err := wait.PollImmediate(Poll, timeout, pollFunc); err != nil {
Failf("Timed out waiting for service %q to %s", name, message)
}
return service
}
// newRCTemplate returns the default v1.ReplicationController object for
// this jig, but does not actually create the RC. The default RC has the same
// name as the jig and runs the "netexec" container.
func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController {
var replicas int32 = 1
rc := &v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: v1.ReplicationControllerSpec{
Replicas: &replicas,
Selector: j.Labels,
Template: &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: j.Labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "netexec",
Image: imageutils.GetE2EImage(imageutils.Netexec),
Args: []string{"--http-port=80", "--udp-port=80"},
ReadinessProbe: &v1.Probe{
PeriodSeconds: 3,
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromInt(80),
Path: "/hostName",
},
},
},
},
},
TerminationGracePeriodSeconds: new(int64),
},
},
},
}
return rc
}
func (j *ServiceTestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
var replicas int32 = 2
rc.Spec.Replicas = &replicas
if rc.Spec.Template.Spec.Affinity == nil {
rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
}
if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
}
rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
v1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
Namespaces: nil,
TopologyKey: "kubernetes.io/hostname",
})
}
func (j *ServiceTestJig) CreatePDBOrFail(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
pdb := j.newPDBTemplate(namespace, rc)
newPdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Create(pdb)
if err != nil {
Failf("Failed to create PDB %q %v", pdb.Name, err)
}
if err := j.waitForPdbReady(namespace); err != nil {
Failf("Failed waiting for PDB to be ready: %v", err)
}
return newPdb
}
// newPDBTemplate returns the default policyv1beta1.PodDisruptionBudget object for
// this jig, but does not actually create the PDB. The default PDB specifies a
// MinAvailable of N-1 and matches the pods created by the RC.
func (j *ServiceTestJig) newPDBTemplate(namespace string, rc *v1.ReplicationController) *policyv1beta1.PodDisruptionBudget {
minAvailable := intstr.FromInt(int(*rc.Spec.Replicas) - 1)
pdb := &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: j.Name,
Labels: j.Labels,
},
Spec: policyv1beta1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
},
}
return pdb
}
// RunOrFail creates a ReplicationController and Pod(s) and waits for the
// Pod(s) to be running. Callers can provide a function to tweak the RC object
// before it is created.
func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.ReplicationController)) *v1.ReplicationController {
rc := j.newRCTemplate(namespace)
if tweak != nil {
tweak(rc)
}
result, err := j.Client.CoreV1().ReplicationControllers(namespace).Create(rc)
if err != nil {
Failf("Failed to create RC %q: %v", rc.Name, err)
}
pods, err := j.waitForPodsCreated(namespace, int(*(rc.Spec.Replicas)))
if err != nil {
Failf("Failed to create pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
Failf("Failed waiting for pods to be running: %v", err)
}
return result
}
func (j *ServiceTestJig) waitForPdbReady(namespace string) error {
timeout := 2 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
pdb, err := j.Client.PolicyV1beta1().PodDisruptionBudgets(namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return err
}
if pdb.Status.PodDisruptionsAllowed > 0 {
return nil
}
}
return fmt.Errorf("Timeout waiting for PDB %q to be ready", j.Name)
}
func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
timeout := 2 * time.Minute
// List the pods, making sure we observe all the replicas.
label := labels.SelectorFromSet(labels.Set(j.Labels))
Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
options := metav1.ListOptions{LabelSelector: label.String()}
pods, err := j.Client.CoreV1().Pods(namespace).List(options)
if err != nil {
return nil, err
}
found := []string{}
for _, pod := range pods.Items {
if pod.DeletionTimestamp != nil {
continue
}
found = append(found, pod.Name)
}
if len(found) == replicas {
Logf("Found all %d pods", replicas)
return found, nil
}
Logf("Found %d/%d pods - will retry", len(found), replicas)
}
return nil, fmt.Errorf("Timeout waiting for %d pods to be created", replicas)
}
func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
timeout := 2 * time.Minute
if !CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
return fmt.Errorf("Timeout waiting for %d pods to be ready", len(pods))
}
return nil
}
// newNetexecPodSpec returns the pod spec of netexec pod
func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "netexec",
Image: NetexecImageName,
Command: []string{
"/netexec",
fmt.Sprintf("--http-port=%d", httpPort),
fmt.Sprintf("--udp-port=%d", udpPort),
},
Ports: []v1.ContainerPort{
{
Name: "http",
ContainerPort: httpPort,
},
{
Name: "udp",
ContainerPort: udpPort,
},
},
},
},
HostNetwork: hostNetwork,
},
}
return pod
}
func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork)
pod.Spec.NodeName = nodeName
pod.ObjectMeta.Labels = j.Labels
podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
_, err := podClient.Create(pod)
ExpectNoError(err)
ExpectNoError(f.WaitForPodRunning(podName))
Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name)
}
// newEchoServerPodSpec returns the pod spec of echo server pod
func newEchoServerPodSpec(podName string) *v1.Pod {
port := 8080
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "echoserver",
Image: imageutils.GetE2EImage(imageutils.EchoServer),
Ports: []v1.ContainerPort{{ContainerPort: int32(port)}},
},
},
RestartPolicy: v1.RestartPolicyNever,
},
}
return pod
}
// LaunchEchoserverPodOnNode launches a pod serving http on port 8080 to act
// as the target for source IP preservation test. The client's source ip would
// be echoed back by the web server.
func (j *ServiceTestJig) LaunchEchoserverPodOnNode(f *Framework, nodeName, podName string) {
Logf("Creating echo server pod %q in namespace %q", podName, f.Namespace.Name)
pod := newEchoServerPodSpec(podName)
pod.Spec.NodeName = nodeName
pod.ObjectMeta.Labels = j.Labels
podClient := f.ClientSet.CoreV1().Pods(f.Namespace.Name)
_, err := podClient.Create(pod)
ExpectNoError(err)
ExpectNoError(f.WaitForPodRunning(podName))
Logf("Echo server pod %q in namespace %q running", pod.Name, f.Namespace.Name)
}
func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
j.TestReachableHTTPWithRetriableErrorCodes(host, port, []int{}, timeout)
}
func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) {
return TestReachableHTTPWithRetriableErrorCodes(host, port, "/echo?msg=hello", "hello", retriableErrCodes)
}); err != nil {
if err == wait.ErrWaitTimeout {
Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout)
} else {
Failf("Failed to reach HTTP service through %v:%v: %v", host, port, err)
}
}
}
func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableHTTP(host, port) }); err != nil {
Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableUDP(host, port, "echo hello", "hello") }); err != nil {
Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableUDP(host, port, "echo hello") }); err != nil {
Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer
var err error
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
var result bool
result, err = TestReachableHTTPWithContent(host, port, url, "", &body)
if err != nil {
Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err)
return false, nil
}
return result, nil
}); pollErr != nil {
Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err)
}
return body
}
func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", ipPort, request)
if ip == "" || port == 0 {
Failf("Got empty IP for reachability check (%s)", url)
return false, fmt.Errorf("Invalid input ip or port")
}
Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPool(url)
if err != nil {
Logf("Got error testing for reachability of %s: %v", url, err)
return false, err
}
defer resp.Body.Close()
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, err
}
// HealthCheck responder returns 503 for no local endpoints
if resp.StatusCode == 503 {
return false, nil
}
// HealthCheck responder returns 200 for non-zero local endpoints
if resp.StatusCode == 200 {
return true, nil
}
return false, fmt.Errorf("Unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
}
func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
count := 0
condition := func() (bool, error) {
success, _ := testHTTPHealthCheckNodePort(host, port, request)
if success && expectSucceed ||
!success && !expectSucceed {
count++
}
if count >= threshold {
return true, nil
}
return false, nil
}
if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
}
return nil
}
// Simple helper class to avoid too much boilerplate in tests
type ServiceTestFixture struct {
ServiceName string
Namespace string
Client clientset.Interface
TestId string
Labels map[string]string
rcs map[string]bool
services map[string]bool
Name string
Image string
}
func NewServerTest(client clientset.Interface, namespace string, serviceName string) *ServiceTestFixture {
t := &ServiceTestFixture{}
t.Client = client
t.Namespace = namespace
t.ServiceName = serviceName
t.TestId = t.ServiceName + "-" + string(uuid.NewUUID())
t.Labels = map[string]string{
"testid": t.TestId,
}
t.rcs = make(map[string]bool)
t.services = make(map[string]bool)
t.Name = "webserver"
t.Image = imageutils.GetE2EImage(imageutils.TestWebserver)
return t
}
// Build default config for a service (which can then be changed)
func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: t.ServiceName,
Namespace: t.Namespace,
},
Spec: v1.ServiceSpec{
Selector: t.Labels,
Ports: []v1.ServicePort{{
Port: 80,
TargetPort: intstr.FromInt(80),
}},
},
}
return service
}
// CreateWebserverRC creates rc-backed pods with the well-known webserver
// configuration and records it for cleanup.
func (t *ServiceTestFixture) CreateWebserverRC(replicas int32) *v1.ReplicationController {
rcSpec := RcByNamePort(t.Name, replicas, t.Image, 80, v1.ProtocolTCP, t.Labels, nil)
rcAct, err := t.CreateRC(rcSpec)
if err != nil {
Failf("Failed to create rc %s: %v", rcSpec.Name, err)
}
if err := VerifyPods(t.Client, t.Namespace, t.Name, false, replicas); err != nil {
Failf("Failed to create %d pods with name %s: %v", replicas, t.Name, err)
}
return rcAct
}
// CreateRC creates a replication controller and records it for cleanup.
func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.ReplicationController, error) {
rc, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Create(rc)
if err == nil {
t.rcs[rc.Name] = true
}
return rc, err
}
// Create a service, and record it for cleanup
func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) {
result, err := t.Client.CoreV1().Services(t.Namespace).Create(service)
if err == nil {
t.services[service.Name] = true
}
return result, err
}
// Delete a service, and remove it from the cleanup list
func (t *ServiceTestFixture) DeleteService(serviceName string) error {
err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
if err == nil {
delete(t.services, serviceName)
}
return err
}
func (t *ServiceTestFixture) Cleanup() []error {
var errs []error
for rcName := range t.rcs {
By("stopping RC " + rcName + " in namespace " + t.Namespace)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// First, resize the RC to 0.
old, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
x := int32(0)
old.Spec.Replicas = &x
if _, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Update(old); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return nil
})
if err != nil {
errs = append(errs, err)
}
// TODO(mikedanese): Wait.
// Then, delete the RC altogether.
if err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
if !errors.IsNotFound(err) {
errs = append(errs, err)
}
}
}
for serviceName := range t.services {
By("deleting service " + serviceName + " in namespace " + t.Namespace)
err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
if err != nil {
if !errors.IsNotFound(err) {
errs = append(errs, err)
}
}
}
return errs
}
func GetIngressPoint(ing *v1.LoadBalancerIngress) string {
host := ing.IP
if host == "" {
host = ing.Hostname
}
return host
}
// UpdateService fetches a service, calls the update function on it,
// and then attempts to send the updated service. It retries up to 2
// times in the face of timeouts and conflicts.
func UpdateService(c clientset.Interface, namespace, serviceName string, update func(*v1.Service)) (*v1.Service, error) {
var service *v1.Service
var err error
for i := 0; i < 3; i++ {
service, err = c.CoreV1().Services(namespace).Get(serviceName, metav1.GetOptions{})
if err != nil {
return service, err
}
update(service)
service, err = c.CoreV1().Services(namespace).Update(service)
if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
return service, err
}
}
return service, err
}
func GetContainerPortsByPodUID(endpoints *v1.Endpoints) PortsByPodUID {
m := PortsByPodUID{}
for _, ss := range endpoints.Subsets {
for _, port := range ss.Ports {
for _, addr := range ss.Addresses {
containerPort := port.Port
if _, ok := m[addr.TargetRef.UID]; !ok {
m[addr.TargetRef.UID] = make([]int, 0)
}
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
}
}
}
return m
}
type PortsByPodName map[string][]int
type PortsByPodUID map[types.UID][]int
func translatePodNameToUIDOrFail(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) PortsByPodUID {
portsByUID := make(PortsByPodUID)
for name, portList := range expectedEndpoints {
pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{})
if err != nil {
Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
}
portsByUID[pod.ObjectMeta.UID] = portList
}
// Logf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)
return portsByUID
}
func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) {
if len(endpoints) != len(expectedEndpoints) {
// should not happen because we check this condition before
Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints)
}
for podUID := range expectedEndpoints {
if _, ok := endpoints[podUID]; !ok {
Failf("endpoint %v not found", podUID)
}
if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) {
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
}
sort.Ints(endpoints[podUID])
sort.Ints(expectedEndpoints[podUID])
for index := range endpoints[podUID] {
if endpoints[podUID][index] != expectedEndpoints[podUID][index] {
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
}
}
}
}
func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) {
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
i := 1
for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) {
endpoints, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
if err != nil {
Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
continue
}
// Logf("Found endpoints %v", endpoints)
portsByPodUID := GetContainerPortsByPodUID(endpoints)
// Logf("Found port by pod UID %v", portsByPodUID)
expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints)
if len(portsByPodUID) == len(expectedEndpoints) {
validatePortsOrFail(portsByPodUID, expectedPortsByPodUID)
Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
serviceName, namespace, expectedEndpoints, time.Since(start))
return
}
if i%5 == 0 {
Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
}
i++
}
if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}); err == nil {
for _, pod := range pods.Items {
Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
}
} else {
Logf("Can't list pod debug info: %v", err)
}
Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout)
}
// StartServeHostnameService creates a replication controller that serves its
// hostname and a service on top of it.
func StartServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
podNames := make([]string, replicas)
name := svc.ObjectMeta.Name
By("creating service " + name + " in namespace " + ns)
_, err := c.CoreV1().Services(ns).Create(svc)
if err != nil {
return podNames, "", err
}
var createdPods []*v1.Pod
maxContainerFailures := 0
config := testutils.RCConfig{
Client: c,
InternalClient: internalClient,
Image: ServeHostnameImage,
Name: name,
Namespace: ns,
PollInterval: 3 * time.Second,
Timeout: PodReadyBeforeTimeout,
Replicas: replicas,
CreatedPods: &createdPods,
MaxContainerFailures: &maxContainerFailures,
}
err = RunRC(config)
if err != nil {
return podNames, "", err
}
if len(createdPods) != replicas {
return podNames, "", fmt.Errorf("Incorrect number of running pods: %v", len(createdPods))
}
for i := range createdPods {
podNames[i] = createdPods[i].ObjectMeta.Name
}
sort.StringSlice(podNames).Sort()
service, err := c.CoreV1().Services(ns).Get(name, metav1.GetOptions{})
if err != nil {
return podNames, "", err
}
if service.Spec.ClusterIP == "" {
return podNames, "", fmt.Errorf("Service IP is blank for %v", name)
}
serviceIP := service.Spec.ClusterIP
return podNames, serviceIP, nil
}
func StopServeHostnameService(clientset clientset.Interface, ns, name string) error {
if err := DeleteRCAndWaitForGC(clientset, ns, name); err != nil {
return err
}
if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {
return err
}
return nil
}
// VerifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
// given host and from within a pod. The host is expected to be an SSH-able node
// in the cluster. Each pod in the service is expected to echo its name. These
// names are compared with the given expectedPods list after a sort | uniq.
func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
execPodName := CreateExecPodOrFail(c, ns, "execpod-", nil)
defer func() {
DeletePodOrFail(c, ns, execPodName)
}()
// Loop a bunch of times - the proxy is randomized, so we want a good
// chance of hitting each backend at least once.
buildCommand := func(wget string) string {
serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
50*len(expectedPods), wget, serviceIPPort)
}
commands := []func() string{
// verify service from node
func() string {
cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
Logf("Executing cmd %q on host %v", cmd, host)
result, err := SSH(cmd, host, TestContext.Provider)
if err != nil || result.Code != 0 {
LogSSHResult(result)
Logf("error while SSH-ing to node: %v", err)
}
return result.Stdout
},
// verify service from pod
func() string {
cmd := buildCommand("wget -q -T 1 -O -")
Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
// TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
output, err := RunHostCmd(ns, execPodName, cmd)
if err != nil {
Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
}
return output
},
}
expectedEndpoints := sets.NewString(expectedPods...)
By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
for _, cmdFunc := range commands {
passed := false
gotEndpoints := sets.NewString()
// Retry cmdFunc for a while
for start := time.Now(); time.Since(start) < KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
for _, endpoint := range strings.Split(cmdFunc(), "\n") {
trimmedEp := strings.TrimSpace(endpoint)
if trimmedEp != "" {
gotEndpoints.Insert(trimmedEp)
}
}
// TODO: simply checking that the retrieved endpoints is a superset
// of the expected allows us to ignore intermitten network flakes that
// result in output like "wget timed out", but these should be rare
// and we need a better way to track how often it occurs.
if gotEndpoints.IsSuperset(expectedEndpoints) {
if !gotEndpoints.Equal(expectedEndpoints) {
Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
}
passed = true
break
}
Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
}
if !passed {
// Sort the lists so they're easier to visually diff.
exp := expectedEndpoints.List()
got := gotEndpoints.List()
sort.StringSlice(exp).Sort()
sort.StringSlice(got).Sort()
return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
}
}
return nil
}
func VerifyServeHostnameServiceDown(c clientset.Interface, host string, serviceIP string, servicePort int) error {
ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
// The current versions of curl included in CentOS and RHEL distros
// misinterpret square brackets around IPv6 as globbing, so use the -g
// argument to disable globbing to handle the IPv6 case.
command := fmt.Sprintf(
"curl -g -s --connect-timeout 2 http://%s && exit 99", ipPort)
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
result, err := SSH(command, host, TestContext.Provider)
if err != nil {
LogSSHResult(result)
Logf("error while SSH-ing to node: %v", err)
}
if result.Code != 99 {
return nil
}
Logf("service still alive - still waiting")
}
return fmt.Errorf("waiting for service to be down timed out")
}
func CleanupServiceResources(c clientset.Interface, loadBalancerName, region, zone string) {
if TestContext.Provider == "gce" || TestContext.Provider == "gke" {
CleanupServiceGCEResources(c, loadBalancerName, region, zone)
}
// TODO: we need to add this function with other cloud providers, if there is a need.
}
func CleanupServiceGCEResources(c clientset.Interface, loadBalancerName, region, zone string) {
if pollErr := wait.Poll(5*time.Second, LoadBalancerCleanupTimeout, func() (bool, error) {
if err := CleanupGCEResources(c, loadBalancerName, region, zone); err != nil {
Logf("Still waiting for glbc to cleanup: %v", err)
return false, nil
}
return true, nil
}); pollErr != nil {
Failf("Failed to cleanup service GCE resources.")
}
}
func DescribeSvc(ns string) {
Logf("\nOutput of kubectl describe svc:\n")
desc, _ := RunKubectl(
"describe", "svc", fmt.Sprintf("--namespace=%v", ns))
Logf(desc)
}
func CreateServiceSpec(serviceName, externalName string, isHeadless bool, selector map[string]string) *v1.Service {
headlessService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
},
Spec: v1.ServiceSpec{
Selector: selector,
},
}
if externalName != "" {
headlessService.Spec.Type = v1.ServiceTypeExternalName
headlessService.Spec.ExternalName = externalName
} else {
headlessService.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: "TCP"},
}
}
if isHeadless {
headlessService.Spec.ClusterIP = "None"
}
return headlessService
}
// EnableAndDisableInternalLB returns two functions for enabling and disabling the internal load balancer
// setting for the supported cloud providers: GCE/GKE and Azure
func EnableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(svc *v1.Service)) {
enable = func(svc *v1.Service) {}
disable = func(svc *v1.Service) {}
switch TestContext.Provider {
case "gce", "gke":
enable = func(svc *v1.Service) {
svc.ObjectMeta.Annotations = map[string]string{gcecloud.ServiceAnnotationLoadBalancerType: string(gcecloud.LBTypeInternal)}
}
disable = func(svc *v1.Service) {
delete(svc.ObjectMeta.Annotations, gcecloud.ServiceAnnotationLoadBalancerType)
}
case "azure":
enable = func(svc *v1.Service) {
svc.ObjectMeta.Annotations = map[string]string{azurecloud.ServiceAnnotationLoadBalancerInternal: "true"}
}
disable = func(svc *v1.Service) {
svc.ObjectMeta.Annotations = map[string]string{azurecloud.ServiceAnnotationLoadBalancerInternal: "false"}
}
}
return
}
func GetServiceLoadBalancerCreationTimeout(cs clientset.Interface) time.Duration {
if nodes := GetReadySchedulableNodesOrDie(cs); len(nodes.Items) > LargeClusterMinNodesNumber {
return LoadBalancerCreateTimeoutLarge
}
return LoadBalancerCreateTimeoutDefault
}
// affinityTracker tracks the destination of a request for the affinity tests.
type affinityTracker struct {
hostTrace []string
}
// Record the response going to a given host.
func (at *affinityTracker) recordHost(host string) {
at.hostTrace = append(at.hostTrace, host)
}
// Check that we got a constant count requests going to the same host.
func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
fulfilled = (len(at.hostTrace) >= count)
if len(at.hostTrace) == 0 {
return fulfilled, true
}
last := at.hostTrace[0:]
if len(at.hostTrace)-count >= 0 {
last = at.hostTrace[len(at.hostTrace)-count:]
}
host := at.hostTrace[len(at.hostTrace)-1]
for _, h := range last {
if h != host {
return fulfilled, false
}
}
return fulfilled, true
}
func checkAffinityFailed(tracker affinityTracker, err string) {
Logf("%v", tracker.hostTrace)
Failf(err)
}
// CheckAffinity function tests whether the service affinity works as expected.
// If affinity is expected and transitionState is true, the test will
// return true once affinityConfirmCount number of same response observed in a
// row. If affinity is not expected, the test will keep observe until different
// responses observed. The function will return false only when no expected
// responses observed before timeout. If transitionState is false, the test will
// fail once different host is given if shouldHold is true.
func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, targetPort int, shouldHold, transitionState bool) bool {
targetIpPort := net.JoinHostPort(targetIp, strconv.Itoa(targetPort))
cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIpPort)
timeout := ServiceTestTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
}
var tracker affinityTracker
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
if execPod != nil {
if stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
Logf("Failed to get response from %s. Retry until timeout", targetIpPort)
return false, nil
} else {
tracker.recordHost(stdout)
}
} else {
rawResponse := jig.GetHTTPContent(targetIp, targetPort, timeout, "")
tracker.recordHost(rawResponse.String())
}
trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
if !shouldHold && !affinityHolds {
return true, nil
}
if shouldHold {
if !transitionState && !affinityHolds {
return true, fmt.Errorf("Affintity should hold but didn't.")
}
if trackerFulfilled && affinityHolds {
return true, nil
}
}
return false, nil
}); pollErr != nil {
trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
if pollErr != wait.ErrWaitTimeout {
checkAffinityFailed(tracker, pollErr.Error())
return false
} else {
if !trackerFulfilled {
checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIpPort))
}
if shouldHold {
checkAffinityFailed(tracker, "Affintity should hold but didn't.")
} else {
checkAffinityFailed(tracker, "Affintity shouldn't hold but did.")
}
return true
}
}
return true
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.11.0-rc.2

搜索帮助

344bd9b3 5694891 D2dac590 5694891