1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ingress.go 15.73 KB
一键复制 编辑 原始数据 按行查看 历史
Patrick Ohly 提交于 2018-08-01 12:46 . e2e: modular framework
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scale
import (
"fmt"
"io/ioutil"
"sync"
"time"
"k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/ingress"
"k8s.io/kubernetes/test/e2e/framework/providers/gce"
)
const (
numIngressesSmall = 5
numIngressesMedium = 20
numIngressesLarge = 50
numIngressesExtraLarge = 99
scaleTestIngressNamePrefix = "ing-scale"
scaleTestBackendName = "echoheaders-scale"
scaleTestSecretName = "tls-secret-scale"
scaleTestHostname = "scale.ingress.com"
scaleTestNumBackends = 10
scaleTestPollInterval = 15 * time.Second
// We don't expect waitForIngress to take longer
// than waitForIngressMaxTimeout.
waitForIngressMaxTimeout = 80 * time.Minute
ingressesCleanupTimeout = 80 * time.Minute
)
var (
scaleTestLabels = map[string]string{
"app": scaleTestBackendName,
}
)
// IngressScaleFramework defines the framework for ingress scale testing.
type IngressScaleFramework struct {
Clientset clientset.Interface
Jig *ingress.IngressTestJig
GCEController *gce.GCEIngressController
CloudConfig framework.CloudConfig
Logger ingress.TestLogger
Namespace string
EnableTLS bool
NumIngressesTest []int
OutputFile string
ScaleTestDeploy *extensions.Deployment
ScaleTestSvcs []*v1.Service
ScaleTestIngs []*extensions.Ingress
// BatchCreateLatencies stores all ingress creation latencies, in different
// batches.
BatchCreateLatencies [][]time.Duration
// BatchDurations stores the total duration for each ingress batch creation.
BatchDurations []time.Duration
// StepCreateLatencies stores the single ingress creation latency, which happens
// after each ingress batch creation is complete.
StepCreateLatencies []time.Duration
// StepCreateLatencies stores the single ingress update latency, which happens
// after each ingress batch creation is complete.
StepUpdateLatencies []time.Duration
}
// NewIngressScaleFramework returns a new framework for ingress scale testing.
func NewIngressScaleFramework(cs clientset.Interface, ns string, cloudConfig framework.CloudConfig) *IngressScaleFramework {
return &IngressScaleFramework{
Namespace: ns,
Clientset: cs,
CloudConfig: cloudConfig,
Logger: &ingress.E2ELogger{},
EnableTLS: true,
NumIngressesTest: []int{
numIngressesSmall,
numIngressesMedium,
numIngressesLarge,
numIngressesExtraLarge,
},
}
}
// PrepareScaleTest prepares framework for ingress scale testing.
func (f *IngressScaleFramework) PrepareScaleTest() error {
f.Logger.Infof("Initializing ingress test suite and gce controller...")
f.Jig = ingress.NewIngressTestJig(f.Clientset)
f.Jig.Logger = f.Logger
f.Jig.PollInterval = scaleTestPollInterval
f.GCEController = &gce.GCEIngressController{
Client: f.Clientset,
Cloud: f.CloudConfig,
}
if err := f.GCEController.Init(); err != nil {
return fmt.Errorf("Failed to initialize GCE controller: %v", err)
}
f.ScaleTestSvcs = []*v1.Service{}
f.ScaleTestIngs = []*extensions.Ingress{}
return nil
}
// CleanupScaleTest cleans up framework for ingress scale testing.
func (f *IngressScaleFramework) CleanupScaleTest() []error {
var errs []error
f.Logger.Infof("Cleaning up ingresses...")
for _, ing := range f.ScaleTestIngs {
if ing != nil {
if err := f.Clientset.ExtensionsV1beta1().Ingresses(ing.Namespace).Delete(ing.Name, nil); err != nil {
errs = append(errs, fmt.Errorf("Error while deleting ingress %s/%s: %v", ing.Namespace, ing.Name, err))
}
}
}
f.Logger.Infof("Cleaning up services...")
for _, svc := range f.ScaleTestSvcs {
if svc != nil {
if err := f.Clientset.CoreV1().Services(svc.Namespace).Delete(svc.Name, nil); err != nil {
errs = append(errs, fmt.Errorf("Error while deleting service %s/%s: %v", svc.Namespace, svc.Name, err))
}
}
}
if f.ScaleTestDeploy != nil {
f.Logger.Infof("Cleaning up deployment %s...", f.ScaleTestDeploy.Name)
if err := f.Clientset.ExtensionsV1beta1().Deployments(f.ScaleTestDeploy.Namespace).Delete(f.ScaleTestDeploy.Name, nil); err != nil {
errs = append(errs, fmt.Errorf("Error while delting deployment %s/%s: %v", f.ScaleTestDeploy.Namespace, f.ScaleTestDeploy.Name, err))
}
}
f.Logger.Infof("Cleaning up cloud resources...")
if err := f.GCEController.CleanupGCEIngressControllerWithTimeout(ingressesCleanupTimeout); err != nil {
errs = append(errs, err)
}
return errs
}
// RunScaleTest runs ingress scale testing.
func (f *IngressScaleFramework) RunScaleTest() []error {
var errs []error
testDeploy := generateScaleTestBackendDeploymentSpec(scaleTestNumBackends)
f.Logger.Infof("Creating deployment %s...", testDeploy.Name)
testDeploy, err := f.Jig.Client.ExtensionsV1beta1().Deployments(f.Namespace).Create(testDeploy)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to create deployment %s: %v", testDeploy.Name, err))
return errs
}
f.ScaleTestDeploy = testDeploy
if f.EnableTLS {
f.Logger.Infof("Ensuring TLS secret %s...", scaleTestSecretName)
if err := f.Jig.PrepareTLSSecret(f.Namespace, scaleTestSecretName, scaleTestHostname); err != nil {
errs = append(errs, fmt.Errorf("Failed to prepare TLS secret %s: %v", scaleTestSecretName, err))
return errs
}
}
// numIngsCreated keeps track of how many ingresses have been created.
numIngsCreated := 0
prepareIngsFunc := func(numIngsNeeded int) {
var ingWg sync.WaitGroup
numIngsToCreate := numIngsNeeded - numIngsCreated
ingWg.Add(numIngsToCreate)
svcQueue := make(chan *v1.Service, numIngsToCreate)
ingQueue := make(chan *extensions.Ingress, numIngsToCreate)
errQueue := make(chan error, numIngsToCreate)
latencyQueue := make(chan time.Duration, numIngsToCreate)
start := time.Now()
for ; numIngsCreated < numIngsNeeded; numIngsCreated++ {
suffix := fmt.Sprintf("%d", numIngsCreated)
go func() {
defer ingWg.Done()
start := time.Now()
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(suffix, f.EnableTLS)
svcQueue <- svcCreated
ingQueue <- ingCreated
if err != nil {
errQueue <- err
return
}
f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
errQueue <- err
return
}
elapsed := time.Since(start)
f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
latencyQueue <- elapsed
}()
}
// Wait until all ingress creations are complete.
f.Logger.Infof("Waiting for %d ingresses to come up...", numIngsToCreate)
ingWg.Wait()
close(svcQueue)
close(ingQueue)
close(errQueue)
close(latencyQueue)
elapsed := time.Since(start)
for svc := range svcQueue {
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svc)
}
for ing := range ingQueue {
f.ScaleTestIngs = append(f.ScaleTestIngs, ing)
}
var createLatencies []time.Duration
for latency := range latencyQueue {
createLatencies = append(createLatencies, latency)
}
f.BatchCreateLatencies = append(f.BatchCreateLatencies, createLatencies)
if len(errQueue) != 0 {
f.Logger.Errorf("Failed while creating services and ingresses, spent %v", elapsed)
for err := range errQueue {
errs = append(errs, err)
}
return
}
f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numIngsToCreate)
f.BatchDurations = append(f.BatchDurations, elapsed)
}
measureCreateUpdateFunc := func() {
f.Logger.Infof("Create one more ingress and wait for it to come up")
start := time.Now()
svcCreated, ingCreated, err := f.createScaleTestServiceIngress(fmt.Sprintf("%d", numIngsCreated), f.EnableTLS)
numIngsCreated = numIngsCreated + 1
f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
if err != nil {
errs = append(errs, err)
return
}
f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
if err := f.Jig.WaitForGivenIngressWithTimeout(ingCreated, false, waitForIngressMaxTimeout); err != nil {
errs = append(errs, err)
return
}
elapsed := time.Since(start)
f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
f.StepCreateLatencies = append(f.StepCreateLatencies, elapsed)
f.Logger.Infof("Updating ingress and wait for change to take effect")
ingToUpdate, err := f.Clientset.ExtensionsV1beta1().Ingresses(f.Namespace).Get(ingCreated.Name, metav1.GetOptions{})
if err != nil {
errs = append(errs, err)
return
}
addTestPathToIngress(ingToUpdate)
start = time.Now()
ingToUpdate, err = f.Clientset.ExtensionsV1beta1().Ingresses(f.Namespace).Update(ingToUpdate)
if err != nil {
errs = append(errs, err)
return
}
if err := f.Jig.WaitForGivenIngressWithTimeout(ingToUpdate, false, waitForIngressMaxTimeout); err != nil {
errs = append(errs, err)
return
}
elapsed = time.Since(start)
f.Logger.Infof("Spent %s for updating ingress %s", elapsed, ingToUpdate.Name)
f.StepUpdateLatencies = append(f.StepUpdateLatencies, elapsed)
}
defer f.dumpLatencies()
for _, num := range f.NumIngressesTest {
f.Logger.Infof("Create more ingresses until we reach %d ingresses", num)
prepareIngsFunc(num)
f.Logger.Infof("Measure create and update latency with %d ingresses", num)
measureCreateUpdateFunc()
if len(errs) != 0 {
return errs
}
}
return errs
}
func (f *IngressScaleFramework) dumpLatencies() {
f.Logger.Infof("Dumping scale test latencies...")
formattedData := f.GetFormattedLatencies()
if f.OutputFile != "" {
f.Logger.Infof("Dumping scale test latencies to file %s...", f.OutputFile)
ioutil.WriteFile(f.OutputFile, []byte(formattedData), 0644)
return
}
f.Logger.Infof("\n%v", formattedData)
}
// GetFormattedLatencies returns the formatted latencies output.
// TODO: Need a better way/format for data output.
func (f *IngressScaleFramework) GetFormattedLatencies() string {
if len(f.NumIngressesTest) == 0 ||
len(f.NumIngressesTest) != len(f.BatchCreateLatencies) ||
len(f.NumIngressesTest) != len(f.BatchDurations) ||
len(f.NumIngressesTest) != len(f.StepCreateLatencies) ||
len(f.NumIngressesTest) != len(f.StepUpdateLatencies) {
return "Failed to construct latencies output."
}
res := "--- Procedure logs ---\n"
for i, latencies := range f.BatchCreateLatencies {
res += fmt.Sprintf("Create %d ingresses parallelly, each of them takes below amount of time before starts serving traffic:\n", len(latencies))
for _, latency := range latencies {
res = res + fmt.Sprintf("- %v\n", latency)
}
res += fmt.Sprintf("Total duration for completing %d ingress creations: %v\n", len(latencies), f.BatchDurations[i])
res += fmt.Sprintf("Duration to create one more ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
res += fmt.Sprintf("Duration to update one ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
}
res = res + "--- Summary ---\n"
var batchTotalStr, batchAvgStr, singleCreateStr, singleUpdateStr string
for i, latencies := range f.BatchCreateLatencies {
batchTotalStr += fmt.Sprintf("Batch creation total latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), f.BatchDurations[i])
var avgLatency time.Duration
for _, latency := range latencies {
avgLatency = avgLatency + latency
}
avgLatency /= time.Duration(len(latencies))
batchAvgStr += fmt.Sprintf("Batch creation average latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), avgLatency)
singleCreateStr += fmt.Sprintf("Single ingress creation latency with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
singleUpdateStr += fmt.Sprintf("Single ingress update latency with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
}
res += batchTotalStr + batchAvgStr + singleCreateStr + singleUpdateStr
return res
}
func addTestPathToIngress(ing *extensions.Ingress) {
ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths = append(
ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths,
extensions.HTTPIngressPath{
Path: "/test",
Backend: ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend,
})
}
func (f *IngressScaleFramework) createScaleTestServiceIngress(suffix string, enableTLS bool) (*v1.Service, *extensions.Ingress, error) {
svcCreated, err := f.Clientset.CoreV1().Services(f.Namespace).Create(generateScaleTestServiceSpec(suffix))
if err != nil {
return nil, nil, err
}
ingCreated, err := f.Clientset.ExtensionsV1beta1().Ingresses(f.Namespace).Create(generateScaleTestIngressSpec(suffix, enableTLS))
if err != nil {
return nil, nil, err
}
return svcCreated, ingCreated, nil
}
func generateScaleTestIngressSpec(suffix string, enableTLS bool) *extensions.Ingress {
ing := &extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", scaleTestIngressNamePrefix, suffix),
},
Spec: extensions.IngressSpec{
TLS: []extensions.IngressTLS{
{SecretName: scaleTestSecretName},
},
Rules: []extensions.IngressRule{
{
Host: scaleTestHostname,
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/scale",
Backend: extensions.IngressBackend{
ServiceName: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
ServicePort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
},
},
}
if enableTLS {
ing.Spec.TLS = []extensions.IngressTLS{
{SecretName: scaleTestSecretName},
}
}
return ing
}
func generateScaleTestServiceSpec(suffix string) *v1.Service {
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
Labels: scaleTestLabels,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Name: "http",
Protocol: v1.ProtocolTCP,
Port: 80,
TargetPort: intstr.FromInt(8080),
}},
Selector: scaleTestLabels,
Type: v1.ServiceTypeNodePort,
},
}
}
func generateScaleTestBackendDeploymentSpec(numReplicas int32) *extensions.Deployment {
return &extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: scaleTestBackendName,
},
Spec: extensions.DeploymentSpec{
Replicas: &numReplicas,
Selector: &metav1.LabelSelector{MatchLabels: scaleTestLabels},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: scaleTestLabels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: scaleTestBackendName,
Image: "k8s.gcr.io/echoserver:1.10",
Ports: []v1.ContainerPort{{ContainerPort: 8080}},
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
HTTPGet: &v1.HTTPGetAction{
Port: intstr.FromInt(8080),
Path: "/healthz",
},
},
FailureThreshold: 10,
PeriodSeconds: 1,
SuccessThreshold: 1,
TimeoutSeconds: 1,
},
},
},
},
},
},
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.13.2-beta.0

搜索帮助