1 Star 0 Fork 0

zhuchance/kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
regional_pd.go 18.71 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"fmt"
"strings"
"time"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/kubelet/apis"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/providers/gce"
"k8s.io/kubernetes/test/e2e/storage/testsuites"
"k8s.io/kubernetes/test/e2e/storage/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
)
const (
pvDeletionTimeout = 3 * time.Minute
statefulSetReadyTimeout = 3 * time.Minute
)
var _ = utils.SIGDescribe("Regional PD", func() {
f := framework.NewDefaultFramework("regional-pd")
// filled in BeforeEach
var c clientset.Interface
var ns string
BeforeEach(func() {
c = f.ClientSet
ns = f.Namespace.Name
framework.SkipUnlessProviderIs("gce", "gke")
framework.SkipUnlessMultizone(c)
})
Describe("RegionalPD", func() {
It("should provision storage [Slow]", func() {
testVolumeProvisioning(c, ns)
})
It("should provision storage with delayed binding [Slow]", func() {
testRegionalDelayedBinding(c, ns)
})
It("should provision storage in the allowedTopologies [Slow]", func() {
testRegionalAllowedTopologies(c, ns)
})
It("should provision storage in the allowedTopologies with delayed binding [Slow]", func() {
testRegionalAllowedTopologiesWithDelayedBinding(c, ns)
})
It("should failover to a different zone when all nodes in one zone become unreachable [Slow] [Disruptive]", func() {
testZonalFailover(c, ns)
})
})
})
func testVolumeProvisioning(c clientset.Interface, ns string) {
cloudZones := getTwoRandomZones(c)
// This test checks that dynamic provisioning can provision a volume
// that can be used to persist data among pods.
tests := []testsuites.StorageClassTest{
{
Name: "HDD Regional PD on GCE/GKE",
CloudProviders: []string{"gce", "gke"},
Provisioner: "kubernetes.io/gce-pd",
Parameters: map[string]string{
"type": "pd-standard",
"zones": strings.Join(cloudZones, ","),
"replication-type": "regional-pd",
},
ClaimSize: "1.5Gi",
ExpectedSize: "2Gi",
PvCheck: func(volume *v1.PersistentVolume) error {
err := checkGCEPD(volume, "pd-standard")
if err != nil {
return err
}
return verifyZonesInPV(volume, sets.NewString(cloudZones...), true /* match */)
},
},
{
Name: "HDD Regional PD with auto zone selection on GCE/GKE",
CloudProviders: []string{"gce", "gke"},
Provisioner: "kubernetes.io/gce-pd",
Parameters: map[string]string{
"type": "pd-standard",
"replication-type": "regional-pd",
},
ClaimSize: "1.5Gi",
ExpectedSize: "2Gi",
PvCheck: func(volume *v1.PersistentVolume) error {
err := checkGCEPD(volume, "pd-standard")
if err != nil {
return err
}
zones, err := framework.GetClusterZones(c)
if err != nil {
return err
}
return verifyZonesInPV(volume, zones, false /* match */)
},
},
}
for _, test := range tests {
class := newStorageClass(test, ns, "" /* suffix */)
claim := newClaim(test, ns, "" /* suffix */)
claim.Spec.StorageClassName = &class.Name
testsuites.TestDynamicProvisioning(test, c, claim, class)
}
}
func testZonalFailover(c clientset.Interface, ns string) {
nodes := framework.GetReadySchedulableNodesOrDie(c)
nodeCount := len(nodes.Items)
cloudZones := getTwoRandomZones(c)
class := newRegionalStorageClass(ns, cloudZones)
claimTemplate := newClaimTemplate(ns)
claimTemplate.Spec.StorageClassName = &class.Name
statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns)
By("creating a StorageClass " + class.Name)
_, err := c.StorageV1().StorageClasses().Create(class)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.Logf("deleting storage class %s", class.Name)
framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(class.Name, nil),
"Error deleting StorageClass %s", class.Name)
}()
By("creating a StatefulSet")
_, err = c.CoreV1().Services(ns).Create(service)
Expect(err).NotTo(HaveOccurred())
_, err = c.AppsV1().StatefulSets(ns).Create(statefulSet)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name)
// typically this claim has already been deleted
framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(statefulSet.Name, nil /* options */),
"Error deleting StatefulSet %s", statefulSet.Name)
framework.Logf("deleting claims in namespace %s", ns)
pvc := getPVC(c, ns, regionalPDLabels)
framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(pvc.Name, nil),
"Error deleting claim %s.", pvc.Name)
if pvc.Spec.VolumeName != "" {
err = framework.WaitForPersistentVolumeDeleted(c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout)
if err != nil {
framework.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName)
}
}
}()
err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
if err != nil {
pod := getPod(c, ns, regionalPDLabels)
Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(BeTrue(),
"The statefulset pod has the following conditions: %s", pod.Status.Conditions)
Expect(err).NotTo(HaveOccurred())
}
pvc := getPVC(c, ns, regionalPDLabels)
By("getting zone information from pod")
pod := getPod(c, ns, regionalPDLabels)
nodeName := pod.Spec.NodeName
node, err := c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
podZone := node.Labels[apis.LabelZoneFailureDomain]
// TODO (verult) Consider using node taints to simulate zonal failure instead.
By("deleting instance group belonging to pod's zone")
// Asynchronously detect a pod reschedule is triggered during/after instance group deletion.
waitStatus := make(chan error)
go func() {
waitStatus <- waitForStatefulSetReplicasNotReady(statefulSet.Name, ns, c)
}()
cloud, err := gce.GetGCECloud()
if err != nil {
Expect(err).NotTo(HaveOccurred())
}
instanceGroupName := framework.TestContext.CloudConfig.NodeInstanceGroup
instanceGroup, err := cloud.GetInstanceGroup(instanceGroupName, podZone)
Expect(err).NotTo(HaveOccurred(),
"Error getting instance group %s in zone %s", instanceGroupName, podZone)
templateName, err := framework.GetManagedInstanceGroupTemplateName(podZone)
Expect(err).NotTo(HaveOccurred(),
"Error getting instance group template in zone %s", podZone)
err = framework.DeleteManagedInstanceGroup(podZone)
Expect(err).NotTo(HaveOccurred(),
"Error deleting instance group in zone %s", podZone)
defer func() {
framework.Logf("recreating instance group %s", instanceGroup.Name)
framework.ExpectNoError(framework.CreateManagedInstanceGroup(instanceGroup.Size, podZone, templateName),
"Error recreating instance group %s in zone %s", instanceGroup.Name, podZone)
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount, framework.RestartNodeReadyAgainTimeout),
"Error waiting for nodes from the new instance group to become ready.")
}()
err = <-waitStatus
Expect(err).ToNot(HaveOccurred(), "Error waiting for replica to be deleted during failover: %v", err)
err = framework.WaitForStatefulSetReplicasReady(statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
if err != nil {
pod := getPod(c, ns, regionalPDLabels)
Expect(podutil.IsPodReadyConditionTrue(pod.Status)).To(BeTrue(),
"The statefulset pod has the following conditions: %s", pod.Status.Conditions)
Expect(err).NotTo(HaveOccurred())
}
By("verifying the same PVC is used by the new pod")
Expect(getPVC(c, ns, regionalPDLabels).Name).To(Equal(pvc.Name),
"The same PVC should be used after failover.")
By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.")
pod = getPod(c, ns, regionalPDLabels)
logs, err := framework.GetPodLogs(c, ns, pod.Name, "")
Expect(err).NotTo(HaveOccurred(),
"Error getting logs from pod %s in namespace %s", pod.Name, ns)
lineCount := len(strings.Split(strings.TrimSpace(logs), "\n"))
expectedLineCount := 2
Expect(lineCount).To(Equal(expectedLineCount),
"Line count of the written file should be %d.", expectedLineCount)
// Verify the pod is scheduled in the other zone.
By("verifying the pod is scheduled in a different zone.")
var otherZone string
if cloudZones[0] == podZone {
otherZone = cloudZones[1]
} else {
otherZone = cloudZones[0]
}
nodeName = pod.Spec.NodeName
node, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
newPodZone := node.Labels[apis.LabelZoneFailureDomain]
Expect(newPodZone).To(Equal(otherZone),
"The pod should be scheduled in zone %s after all nodes in zone %s have been deleted", otherZone, podZone)
}
func testRegionalDelayedBinding(c clientset.Interface, ns string) {
test := testsuites.StorageClassTest{
Name: "Regional PD storage class with waitForFirstConsumer test on GCE",
Provisioner: "kubernetes.io/gce-pd",
Parameters: map[string]string{
"type": "pd-standard",
"replication-type": "regional-pd",
},
ClaimSize: "2Gi",
DelayBinding: true,
}
suffix := "delayed-regional"
class := newStorageClass(test, ns, suffix)
claim := newClaim(test, ns, suffix)
claim.Spec.StorageClassName = &class.Name
pv, node := testBindingWaitForFirstConsumer(c, claim, class)
if node == nil {
framework.Failf("unexpected nil node found")
}
zone, ok := node.Labels[kubeletapis.LabelZoneFailureDomain]
if !ok {
framework.Failf("label %s not found on Node", kubeletapis.LabelZoneFailureDomain)
}
checkZoneFromLabelAndAffinity(pv, zone, false)
}
func testRegionalAllowedTopologies(c clientset.Interface, ns string) {
test := testsuites.StorageClassTest{
Name: "Regional PD storage class with allowedTopologies test on GCE",
Provisioner: "kubernetes.io/gce-pd",
Parameters: map[string]string{
"type": "pd-standard",
"replication-type": "regional-pd",
},
ClaimSize: "2Gi",
ExpectedSize: "2Gi",
}
suffix := "topo-regional"
class := newStorageClass(test, ns, suffix)
zones := getTwoRandomZones(c)
addAllowedTopologiesToStorageClass(c, class, zones)
claim := newClaim(test, ns, suffix)
claim.Spec.StorageClassName = &class.Name
pv := testsuites.TestDynamicProvisioning(test, c, claim, class)
checkZonesFromLabelAndAffinity(pv, sets.NewString(zones...), true)
}
func testRegionalAllowedTopologiesWithDelayedBinding(c clientset.Interface, ns string) {
test := testsuites.StorageClassTest{
Name: "Regional PD storage class with allowedTopologies and waitForFirstConsumer test on GCE",
Provisioner: "kubernetes.io/gce-pd",
Parameters: map[string]string{
"type": "pd-standard",
"replication-type": "regional-pd",
},
ClaimSize: "2Gi",
DelayBinding: true,
}
suffix := "topo-delayed-regional"
class := newStorageClass(test, ns, suffix)
topoZones := getTwoRandomZones(c)
addAllowedTopologiesToStorageClass(c, class, topoZones)
claim := newClaim(test, ns, suffix)
claim.Spec.StorageClassName = &class.Name
pv, node := testBindingWaitForFirstConsumer(c, claim, class)
if node == nil {
framework.Failf("unexpected nil node found")
}
nodeZone, ok := node.Labels[kubeletapis.LabelZoneFailureDomain]
if !ok {
framework.Failf("label %s not found on Node", kubeletapis.LabelZoneFailureDomain)
}
zoneFound := false
for _, zone := range topoZones {
if zone == nodeZone {
zoneFound = true
break
}
}
if !zoneFound {
framework.Failf("zones specified in AllowedTopologies: %v does not contain zone of node where PV got provisioned: %s", topoZones, nodeZone)
}
checkZonesFromLabelAndAffinity(pv, sets.NewString(topoZones...), true)
}
func getPVC(c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim {
selector := labels.Set(pvcLabels).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(options)
Expect(err).NotTo(HaveOccurred())
Expect(len(pvcList.Items)).To(Equal(1), "There should be exactly 1 PVC matched.")
return &pvcList.Items[0]
}
func getPod(c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod {
selector := labels.Set(podLabels).AsSelector()
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := c.CoreV1().Pods(ns).List(options)
Expect(err).NotTo(HaveOccurred())
Expect(len(podList.Items)).To(Equal(1), "There should be exactly 1 pod matched.")
return &podList.Items[0]
}
func addAllowedTopologiesToStorageClass(c clientset.Interface, sc *storage.StorageClass, zones []string) {
term := v1.TopologySelectorTerm{
MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
{
Key: kubeletapis.LabelZoneFailureDomain,
Values: zones,
},
},
}
sc.AllowedTopologies = append(sc.AllowedTopologies, term)
}
// Generates the spec of a StatefulSet with 1 replica that mounts a Regional PD.
func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) {
var replicas int32 = 1
labels = map[string]string{"app": "regional-pd-workload"}
svc = &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "regional-pd-service",
Namespace: ns,
Labels: labels,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{{
Port: 80,
Name: "web",
}},
ClusterIP: v1.ClusterIPNone,
Selector: labels,
},
}
sts = &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "regional-pd-sts",
Namespace: ns,
},
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
ServiceName: svc.Name,
Replicas: &replicas,
Template: *newPodTemplate(labels),
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate},
},
}
return
}
func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec {
return &v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
// This container writes its pod name to a file in the Regional PD
// and prints the entire file to stdout.
{
Name: "busybox",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"sh", "-c"},
Args: []string{
"echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" +
"cat /mnt/data/regional-pd/pods.txt;" +
"sleep 3600;",
},
Env: []v1.EnvVar{{
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
}},
Ports: []v1.ContainerPort{{
ContainerPort: 80,
Name: "web",
}},
VolumeMounts: []v1.VolumeMount{{
Name: "regional-pd-vol",
MountPath: "/mnt/data/regional-pd",
}},
},
},
},
}
}
func newClaimTemplate(ns string) *v1.PersistentVolumeClaim {
return &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "regional-pd-vol",
Namespace: ns,
},
Spec: v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
},
},
}
}
func newRegionalStorageClass(namespace string, zones []string) *storage.StorageClass {
return &storage.StorageClass{
TypeMeta: metav1.TypeMeta{
Kind: "StorageClass",
},
ObjectMeta: metav1.ObjectMeta{
Name: namespace + "-sc",
},
Provisioner: "kubernetes.io/gce-pd",
Parameters: map[string]string{
"type": "pd-standard",
"zones": strings.Join(zones, ","),
"replication-type": "regional-pd",
},
}
}
func getTwoRandomZones(c clientset.Interface) []string {
zones, err := framework.GetClusterZones(c)
Expect(err).ToNot(HaveOccurred())
Expect(zones.Len()).To(BeNumerically(">=", 2),
"The test should only be run in multizone clusters.")
zone1, _ := zones.PopAny()
zone2, _ := zones.PopAny()
return []string{zone1, zone2}
}
// Waits for at least 1 replica of a StatefulSet to become not ready or until timeout occurs, whichever comes first.
func waitForStatefulSetReplicasNotReady(statefulSetName, ns string, c clientset.Interface) error {
const poll = 3 * time.Second
const timeout = statefulSetReadyTimeout
framework.Logf("Waiting up to %v for StatefulSet %s to have at least 1 replica to become not ready", timeout, statefulSetName)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
sts, err := c.AppsV1().StatefulSets(ns).Get(statefulSetName, metav1.GetOptions{})
if err != nil {
framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, poll, err)
continue
} else {
if sts.Status.ReadyReplicas < *sts.Spec.Replicas {
framework.Logf("%d replicas are ready out of a total of %d replicas in StatefulSet %s. (%v)",
sts.Status.ReadyReplicas, *sts.Spec.Replicas, statefulSetName, time.Since(start))
return nil
} else {
framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
}
}
}
return fmt.Errorf("All replicas in StatefulSet %s are still ready within %v", statefulSetName, timeout)
}
// If match is true, check if zones in PV exactly match zones given.
// Otherwise, check whether zones in PV is superset of zones given.
func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
pvZones, err := util.LabelZonesToSet(volume.Labels[apis.LabelZoneFailureDomain])
if err != nil {
return err
}
if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) {
return nil
}
return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.13.0-alpha.2

搜索帮助