代码拉取完成,页面将自动刷新
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"strconv"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/test/e2e/framework"
"github.com/golang/glog"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
defaultTimeout = 3 * time.Minute
resizeTimeout = 5 * time.Minute
scaleUpTimeout = 5 * time.Minute
scaleDownTimeout = 15 * time.Minute
gkeEndpoint = "https://test-container.sandbox.googleapis.com"
gkeUpdateTimeout = 10 * time.Minute
)
var _ = framework.KubeDescribe("Cluster size autoscaling [Slow]", func() {
f := framework.NewDefaultFramework("autoscaling")
var c *client.Client
var nodeCount int
var coresPerNode int
var memCapacityMb int
var originalSizes map[string]int
BeforeEach(func() {
c = f.Client
framework.SkipUnlessProviderIs("gce", "gke")
nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
nodeCount = len(nodes.Items)
Expect(nodeCount).NotTo(BeZero())
cpu := nodes.Items[0].Status.Capacity[api.ResourceCPU]
mem := nodes.Items[0].Status.Capacity[api.ResourceMemory]
coresPerNode = int((&cpu).MilliValue() / 1000)
memCapacityMb = int((&mem).Value() / 1024 / 1024)
originalSizes = make(map[string]int)
sum := 0
for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
size, err := GroupSize(mig)
framework.ExpectNoError(err)
By(fmt.Sprintf("Initial size of %s: %d", mig, size))
originalSizes[mig] = size
sum += size
}
Expect(nodeCount).Should(Equal(sum))
if framework.ProviderIs("gke") {
val, err := isAutoscalerEnabled(3)
framework.ExpectNoError(err)
if !val {
err = enableAutoscaler("default-pool", 3, 5)
framework.ExpectNoError(err)
}
}
})
AfterEach(func() {
By(fmt.Sprintf("Restoring initial size of the cluster"))
setMigSizes(originalSizes)
framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount, scaleDownTimeout))
})
It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() {
By("Creating unschedulable pod")
ReserveMemory(f, "memory-reservation", 1, memCapacityMb, false)
defer framework.DeleteRC(f.Client, f.Namespace.Name, "memory-reservation")
By("Waiting for scale up hoping it won't happen")
// Verfiy, that the appropreate event was generated.
eventFound := false
EventsLoop:
for start := time.Now(); time.Since(start) < scaleUpTimeout; time.Sleep(20 * time.Second) {
By("Waiting for NotTriggerScaleUp event")
events, err := f.Client.Events(f.Namespace.Name).List(api.ListOptions{})
framework.ExpectNoError(err)
for _, e := range events.Items {
if e.InvolvedObject.Kind == "Pod" && e.Reason == "NotTriggerScaleUp" && strings.Contains(e.Message, "it wouldn't fit if a new node is added") {
By("NotTriggerScaleUp event found")
eventFound = true
break EventsLoop
}
}
}
Expect(eventFound).Should(Equal(true))
// Verify, that cluster size is not changed.
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size <= nodeCount }, time.Second))
})
It("should increase cluster size if pending pods are small [Feature:ClusterSizeAutoscalingScaleUp]", func() {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false)
defer framework.DeleteRC(f.Client, f.Namespace.Name, "memory-reservation")
// Verify, that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
})
It("should increase cluster size if pods are pending due to host port conflict [Feature:ClusterSizeAutoscalingScaleUp]", func() {
CreateHostPortPods(f, "host-port", nodeCount+2, false)
defer framework.DeleteRC(f.Client, f.Namespace.Name, "host-port")
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size >= nodeCount+2 }, scaleUpTimeout))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
})
It("should add node to the particular mig [Feature:ClusterSizeAutoscalingScaleUp]", func() {
labels := map[string]string{"cluster-autoscaling-test.special-node": "true"}
By("Finding the smallest MIG")
minMig := ""
minSize := nodeCount
for mig, size := range originalSizes {
if size <= minSize {
minMig = mig
minSize = size
}
}
removeLabels := func(nodesToClean sets.String) {
By("Removing labels from nodes")
updateNodeLabels(c, nodesToClean, nil, labels)
}
nodes, err := GetGroupNodes(minMig)
ExpectNoError(err)
nodesSet := sets.NewString(nodes...)
defer removeLabels(nodesSet)
By(fmt.Sprintf("Annotating nodes of the smallest MIG(%s): %v", minMig, nodes))
updateNodeLabels(c, nodesSet, labels, nil)
CreateNodeSelectorPods(f, "node-selector", minSize+1, labels, false)
By("Waiting for new node to appear and annotating it")
WaitForGroupSize(minMig, int32(minSize+1))
// Verify, that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
newNodes, err := GetGroupNodes(minMig)
ExpectNoError(err)
newNodesSet := sets.NewString(newNodes...)
newNodesSet.Delete(nodes...)
defer removeLabels(newNodesSet)
By(fmt.Sprintf("Setting labels for new nodes: %v", newNodesSet.List()))
updateNodeLabels(c, newNodesSet, labels, nil)
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.DeleteRC(f.Client, f.Namespace.Name, "node-selector"))
})
It("should scale up correct target pool [Feature:ClusterSizeAutoscalingScaleUp]", func() {
framework.SkipUnlessProviderIs("gke")
By("Creating new node-pool with one n1-standard-4 machine")
const extraPoolName = "extra-pool"
output, err := exec.Command("gcloud", "alpha", "container", "node-pools", "create", extraPoolName, "--quiet",
"--machine-type=n1-standard-4",
"--num-nodes=1",
"--project="+framework.TestContext.CloudConfig.ProjectID,
"--zone="+framework.TestContext.CloudConfig.Zone,
"--cluster="+framework.TestContext.CloudConfig.Cluster).CombinedOutput()
defer func() {
glog.Infof("Deleting node pool %s", extraPoolName)
output, err := exec.Command("gcloud", "alpha", "container", "node-pools", "delete", extraPoolName, "--quiet",
"--project="+framework.TestContext.CloudConfig.ProjectID,
"--zone="+framework.TestContext.CloudConfig.Zone,
"--cluster="+framework.TestContext.CloudConfig.Cluster).CombinedOutput()
if err != nil {
glog.Infof("Error: %v", err)
}
glog.Infof("Node-pool deletion output: %s", output)
}()
framework.ExpectNoError(err)
glog.Infof("Creating node-pool: %s", output)
framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount+1, resizeTimeout))
framework.ExpectNoError(enableAutoscaler(extraPoolName, 1, 2))
By("Creating rc with 2 pods too big to fit default-pool but fitting extra-pool")
ReserveMemory(f, "memory-reservation", 2, 2*memCapacityMb, false)
defer framework.DeleteRC(f.Client, f.Namespace.Name, "memory-reservation")
framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount+2, scaleUpTimeout))
})
It("should correctly scale down after a node is not needed [Feature:ClusterSizeAutoscalingScaleDown]", func() {
By("Manually increase cluster size")
increasedSize := 0
newSizes := make(map[string]int)
for key, val := range originalSizes {
newSizes[key] = val + 2
increasedSize += val + 2
}
setMigSizes(newSizes)
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size >= increasedSize }, scaleUpTimeout))
By("Some node should be removed")
framework.ExpectNoError(WaitForClusterSizeFunc(f.Client,
func(size int) bool { return size < increasedSize }, scaleDownTimeout))
})
})
func getGKEClusterUrl() string {
out, err := exec.Command("gcloud", "auth", "print-access-token").Output()
framework.ExpectNoError(err)
token := strings.Replace(string(out), "\n", "", -1)
return fmt.Sprintf("%s/v1/projects/%s/zones/%s/clusters/%s?access_token=%s",
gkeEndpoint,
framework.TestContext.CloudConfig.ProjectID,
framework.TestContext.CloudConfig.Zone,
framework.TestContext.CloudConfig.Cluster,
token)
}
func isAutoscalerEnabled(expectedMinNodeCountInTargetPool int) (bool, error) {
resp, err := http.Get(getGKEClusterUrl())
if err != nil {
return false, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, err
}
strBody := string(body)
glog.Infof("Cluster config %s", strBody)
if strings.Contains(strBody, "\"minNodeCount\": "+strconv.Itoa(expectedMinNodeCountInTargetPool)) {
return true, nil
}
return false, nil
}
func enableAutoscaler(nodePool string, minCount, maxCount int) error {
updateRequest := "{" +
" \"update\": {" +
" \"desiredNodePoolId\": \"" + nodePool + "\"," +
" \"desiredNodePoolAutoscaling\": {" +
" \"enabled\": \"true\"," +
" \"minNodeCount\": \"" + strconv.Itoa(minCount) + "\"," +
" \"maxNodeCount\": \"" + strconv.Itoa(maxCount) + "\"" +
" }" +
" }" +
"}"
url := getGKEClusterUrl()
glog.Infof("Using gke api url %s", url)
putResult, err := doPut(url, updateRequest)
if err != nil {
return fmt.Errorf("Failed to put %s: %v", url, err)
}
glog.Infof("Config update result: %s", putResult)
for startTime := time.Now(); startTime.Add(gkeUpdateTimeout).After(time.Now()); time.Sleep(30 * time.Second) {
if val, err := isAutoscalerEnabled(minCount); err == nil && val {
return nil
}
}
return fmt.Errorf("autoscaler not enabled")
}
func doPut(url, content string) (string, error) {
req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(content)))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
strBody := string(body)
return strBody, nil
}
func CreateNodeSelectorPods(f *framework.Framework, id string, replicas int, nodeSelector map[string]string, expectRunning bool) {
By(fmt.Sprintf("Running RC which reserves host port and defines node selector"))
config := &framework.RCConfig{
Client: f.Client,
Name: "node-selector",
Namespace: f.Namespace.Name,
Timeout: defaultTimeout,
Image: "gcr.io/google_containers/pause-amd64:3.0",
Replicas: replicas,
HostPorts: map[string]int{"port1": 4321},
NodeSelector: map[string]string{"cluster-autoscaling-test.special-node": "true"},
}
err := framework.RunRC(*config)
if expectRunning {
framework.ExpectNoError(err)
}
}
func CreateHostPortPods(f *framework.Framework, id string, replicas int, expectRunning bool) {
By(fmt.Sprintf("Running RC which reserves host port"))
config := &framework.RCConfig{
Client: f.Client,
Name: id,
Namespace: f.Namespace.Name,
Timeout: defaultTimeout,
Image: framework.GetPauseImageName(f.Client),
Replicas: replicas,
HostPorts: map[string]int{"port1": 4321},
}
err := framework.RunRC(*config)
if expectRunning {
framework.ExpectNoError(err)
}
}
func ReserveCpu(f *framework.Framework, id string, replicas, millicores int) {
By(fmt.Sprintf("Running RC which reserves %v millicores", millicores))
request := int64(millicores / replicas)
config := &framework.RCConfig{
Client: f.Client,
Name: id,
Namespace: f.Namespace.Name,
Timeout: defaultTimeout,
Image: framework.GetPauseImageName(f.Client),
Replicas: replicas,
CpuRequest: request,
}
framework.ExpectNoError(framework.RunRC(*config))
}
func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool) {
By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
request := int64(1024 * 1024 * megabytes / replicas)
config := &framework.RCConfig{
Client: f.Client,
Name: id,
Namespace: f.Namespace.Name,
Timeout: defaultTimeout,
Image: framework.GetPauseImageName(f.Client),
Replicas: replicas,
MemRequest: request,
}
err := framework.RunRC(*config)
if expectRunning {
framework.ExpectNoError(err)
}
}
// WaitForClusterSize waits until the cluster size matches the given function.
func WaitForClusterSizeFunc(c *client.Client, sizeFunc func(int) bool, timeout time.Duration) error {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(20 * time.Second) {
nodes, err := c.Nodes().List(api.ListOptions{FieldSelector: fields.Set{
"spec.unschedulable": "false",
}.AsSelector()})
if err != nil {
glog.Warningf("Failed to list nodes: %v", err)
continue
}
numNodes := len(nodes.Items)
// Filter out not-ready nodes.
framework.FilterNodes(nodes, func(node api.Node) bool {
return framework.IsNodeConditionSetAsExpected(&node, api.NodeReady, true)
})
numReady := len(nodes.Items)
if numNodes == numReady && sizeFunc(numReady) {
glog.Infof("Cluster has reached the desired size")
return nil
}
glog.Infof("Waiting for cluster, current size %d, not ready nodes %d", numNodes, numNodes-numReady)
}
return fmt.Errorf("timeout waiting %v for appropriate cluster size", timeout)
}
func waitForAllCaPodsReadyInNamespace(f *framework.Framework, c *client.Client) error {
var notready []string
for start := time.Now(); time.Now().Before(start.Add(scaleUpTimeout)); time.Sleep(20 * time.Second) {
pods, err := c.Pods(f.Namespace.Name).List(api.ListOptions{})
if err != nil {
return fmt.Errorf("failed to get pods: %v", err)
}
notready = make([]string, 0)
for _, pod := range pods.Items {
ready := false
for _, c := range pod.Status.Conditions {
if c.Type == api.PodReady && c.Status == api.ConditionTrue {
ready = true
}
}
if !ready {
notready = append(notready, pod.Name)
}
}
if len(notready) == 0 {
glog.Infof("All pods ready")
return nil
}
glog.Infof("Some pods are not ready yet: %v", notready)
}
glog.Info("Timeout on waiting for pods being ready")
glog.Info(framework.RunKubectlOrDie("get", "pods", "-o json", "--all-namespaces"))
glog.Info(framework.RunKubectlOrDie("get", "nodes", "-o json"))
// Some pods are still not running.
return fmt.Errorf("Some pods are still not running: %v", notready)
}
func setMigSizes(sizes map[string]int) {
for mig, desiredSize := range sizes {
currentSize, err := GroupSize(mig)
framework.ExpectNoError(err)
if desiredSize != currentSize {
By(fmt.Sprintf("Setting size of %s to %d", mig, desiredSize))
err = ResizeGroup(mig, int32(desiredSize))
framework.ExpectNoError(err)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。