1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
watch.go 19.78 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"fmt"
"math/rand"
"time"
"k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
const (
watchConfigMapLabelKey = "watch-this-configmap"
multipleWatchersLabelValueA = "multiple-watchers-A"
multipleWatchersLabelValueB = "multiple-watchers-B"
fromResourceVersionLabelValue = "from-resource-version"
watchRestartedLabelValue = "watch-closed-and-restarted"
toBeChangedLabelValue = "label-changed-and-restored"
)
var _ = SIGDescribe("Watchers", func() {
f := framework.NewDefaultFramework("watch")
/*
Testname: watch-configmaps-with-multiple-watchers
Description: Ensure that multiple watchers are able to receive all add,
update, and delete notifications on configmaps that match a label selector and do
not receive notifications for configmaps which do not match that label selector.
*/
framework.ConformanceIt("should observe add, update, and delete watch notifications on configmaps", func() {
c := f.ClientSet
ns := f.Namespace.Name
By("creating a watch on configmaps with label A")
watchA, err := watchConfigMaps(f, "", multipleWatchersLabelValueA)
Expect(err).NotTo(HaveOccurred(), "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueA)
By("creating a watch on configmaps with label B")
watchB, err := watchConfigMaps(f, "", multipleWatchersLabelValueB)
Expect(err).NotTo(HaveOccurred(), "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueB)
By("creating a watch on configmaps with label A or B")
watchAB, err := watchConfigMaps(f, "", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
Expect(err).NotTo(HaveOccurred(), "failed to create a watch on configmaps with label %s or %s", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
testConfigMapA := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "e2e-watch-test-configmap-a",
Labels: map[string]string{
watchConfigMapLabelKey: multipleWatchersLabelValueA,
},
},
}
testConfigMapB := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "e2e-watch-test-configmap-b",
Labels: map[string]string{
watchConfigMapLabelKey: multipleWatchersLabelValueB,
},
},
}
By("creating a configmap with label A and ensuring the correct watchers observe the notification")
testConfigMapA, err = c.CoreV1().ConfigMaps(ns).Create(testConfigMapA)
Expect(err).NotTo(HaveOccurred(), "failed to create a configmap with label %s in namespace: %s", multipleWatchersLabelValueA, ns)
expectEvent(watchA, watch.Added, testConfigMapA)
expectEvent(watchAB, watch.Added, testConfigMapA)
expectNoEvent(watchB, watch.Added, testConfigMapA)
By("modifying configmap A and ensuring the correct watchers observe the notification")
testConfigMapA, err = updateConfigMap(c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "1")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
expectEvent(watchA, watch.Modified, testConfigMapA)
expectEvent(watchAB, watch.Modified, testConfigMapA)
expectNoEvent(watchB, watch.Modified, testConfigMapA)
By("modifying configmap A again and ensuring the correct watchers observe the notification")
testConfigMapA, err = updateConfigMap(c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "2")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
expectEvent(watchA, watch.Modified, testConfigMapA)
expectEvent(watchAB, watch.Modified, testConfigMapA)
expectNoEvent(watchB, watch.Modified, testConfigMapA)
By("deleting configmap A and ensuring the correct watchers observe the notification")
err = c.CoreV1().ConfigMaps(ns).Delete(testConfigMapA.GetName(), nil)
Expect(err).NotTo(HaveOccurred(), "failed to delete configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
expectEvent(watchA, watch.Deleted, nil)
expectEvent(watchAB, watch.Deleted, nil)
expectNoEvent(watchB, watch.Deleted, nil)
By("creating a configmap with label B and ensuring the correct watchers observe the notification")
testConfigMapB, err = c.CoreV1().ConfigMaps(ns).Create(testConfigMapB)
Expect(err).NotTo(HaveOccurred(), "failed to create configmap %s in namespace: %s", testConfigMapB, ns)
expectEvent(watchB, watch.Added, testConfigMapB)
expectEvent(watchAB, watch.Added, testConfigMapB)
expectNoEvent(watchA, watch.Added, testConfigMapB)
By("deleting configmap B and ensuring the correct watchers observe the notification")
err = c.CoreV1().ConfigMaps(ns).Delete(testConfigMapB.GetName(), nil)
Expect(err).NotTo(HaveOccurred(), "failed to delete configmap %s in namespace: %s", testConfigMapB.GetName(), ns)
expectEvent(watchB, watch.Deleted, nil)
expectEvent(watchAB, watch.Deleted, nil)
expectNoEvent(watchA, watch.Deleted, nil)
})
/*
Testname: watch-configmaps-from-resource-version
Description: Ensure that a watch can be opened from a particular resource version
in the past and only notifications happening after that resource version are observed.
*/
framework.ConformanceIt("should be able to start watching from a specific resource version", func() {
c := f.ClientSet
ns := f.Namespace.Name
testConfigMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "e2e-watch-test-resource-version",
Labels: map[string]string{
watchConfigMapLabelKey: fromResourceVersionLabelValue,
},
},
}
By("creating a new configmap")
testConfigMap, err := c.CoreV1().ConfigMaps(ns).Create(testConfigMap)
Expect(err).NotTo(HaveOccurred(), "failed to create configmap %s in namespace: %s", testConfigMap.GetName(), ns)
By("modifying the configmap once")
testConfigMapFirstUpdate, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "1")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace: %s", testConfigMap.GetName(), ns)
By("modifying the configmap a second time")
testConfigMapSecondUpdate, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "2")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace %s a second time", testConfigMap.GetName(), ns)
By("deleting the configmap")
err = c.CoreV1().ConfigMaps(ns).Delete(testConfigMap.GetName(), nil)
Expect(err).NotTo(HaveOccurred(), "failed to delete configmap %s in namespace: %s", testConfigMap.GetName(), ns)
By("creating a watch on configmaps from the resource version returned by the first update")
testWatch, err := watchConfigMaps(f, testConfigMapFirstUpdate.ObjectMeta.ResourceVersion, fromResourceVersionLabelValue)
Expect(err).NotTo(HaveOccurred(), "failed to create a watch on configmaps from the resource version %s returned by the first update", testConfigMapFirstUpdate.ObjectMeta.ResourceVersion)
By("Expecting to observe notifications for all changes to the configmap after the first update")
expectEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
expectEvent(testWatch, watch.Deleted, nil)
})
/*
Testname: watch-configmaps-closed-and-restarted
Description: Ensure that a watch can be reopened from the last resource version
observed by the previous watch, and it will continue delivering notifications from
that point in time.
*/
framework.ConformanceIt("should be able to restart watching from the last resource version observed by the previous watch", func() {
c := f.ClientSet
ns := f.Namespace.Name
configMapName := "e2e-watch-test-watch-closed"
testConfigMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Labels: map[string]string{
watchConfigMapLabelKey: watchRestartedLabelValue,
},
},
}
By("creating a watch on configmaps")
testWatchBroken, err := watchConfigMaps(f, "", watchRestartedLabelValue)
Expect(err).NotTo(HaveOccurred(), "failed to create a watch on configmap with label: %s", watchRestartedLabelValue)
By("creating a new configmap")
testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(testConfigMap)
Expect(err).NotTo(HaveOccurred(), "failed to create configmap %s in namespace: %s", configMapName, ns)
By("modifying the configmap once")
_, err = updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "1")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace: %s", configMapName, ns)
By("closing the watch once it receives two notifications")
expectEvent(testWatchBroken, watch.Added, testConfigMap)
lastEvent, ok := waitForEvent(testWatchBroken, watch.Modified, nil, 1*time.Minute)
if !ok {
framework.Failf("Timed out waiting for second watch notification")
}
testWatchBroken.Stop()
By("modifying the configmap a second time, while the watch is closed")
testConfigMapSecondUpdate, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "2")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace %s a second time", configMapName, ns)
By("creating a new watch on configmaps from the last resource version observed by the first watch")
lastEventConfigMap, ok := lastEvent.Object.(*v1.ConfigMap)
if !ok {
framework.Failf("Expected last notification to refer to a configmap but got: %v", lastEvent)
}
testWatchRestarted, err := watchConfigMaps(f, lastEventConfigMap.ObjectMeta.ResourceVersion, watchRestartedLabelValue)
Expect(err).NotTo(HaveOccurred(), "failed to create a new watch on configmaps from the last resource version %s observed by the first watch", lastEventConfigMap.ObjectMeta.ResourceVersion)
By("deleting the configmap")
err = c.CoreV1().ConfigMaps(ns).Delete(testConfigMap.GetName(), nil)
Expect(err).NotTo(HaveOccurred(), "failed to delete configmap %s in namespace: %s", configMapName, ns)
By("Expecting to observe notifications for all changes to the configmap since the first watch closed")
expectEvent(testWatchRestarted, watch.Modified, testConfigMapSecondUpdate)
expectEvent(testWatchRestarted, watch.Deleted, nil)
})
/*
Testname: watch-configmaps-label-changed
Description: Ensure that a watched object stops meeting the requirements of
a watch's selector, the watch will observe a delete, and will not observe
notifications for that object until it meets the selector's requirements again.
*/
framework.ConformanceIt("should observe an object deletion if it stops meeting the requirements of the selector", func() {
c := f.ClientSet
ns := f.Namespace.Name
configMapName := "e2e-watch-test-label-changed"
testConfigMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Labels: map[string]string{
watchConfigMapLabelKey: toBeChangedLabelValue,
},
},
}
By("creating a watch on configmaps with a certain label")
testWatch, err := watchConfigMaps(f, "", toBeChangedLabelValue)
Expect(err).NotTo(HaveOccurred(), "failed to create a watch on configmap with label: %s", toBeChangedLabelValue)
By("creating a new configmap")
testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(testConfigMap)
Expect(err).NotTo(HaveOccurred(), "failed to create configmap %s in namespace: %s", configMapName, ns)
By("modifying the configmap once")
testConfigMapFirstUpdate, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "1")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace: %s", configMapName, ns)
By("changing the label value of the configmap")
_, err = updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
cm.ObjectMeta.Labels[watchConfigMapLabelKey] = "wrong-value"
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace %s by changing label value", configMapName, ns)
By("Expecting to observe a delete notification for the watched object")
expectEvent(testWatch, watch.Added, testConfigMap)
expectEvent(testWatch, watch.Modified, testConfigMapFirstUpdate)
expectEvent(testWatch, watch.Deleted, nil)
By("modifying the configmap a second time")
testConfigMapSecondUpdate, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "2")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace %s a second time", configMapName, ns)
By("Expecting not to observe a notification because the object no longer meets the selector's requirements")
expectNoEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
By("changing the label value of the configmap back")
testConfigMapLabelRestored, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
cm.ObjectMeta.Labels[watchConfigMapLabelKey] = toBeChangedLabelValue
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace %s by changing label value back", configMapName, ns)
By("modifying the configmap a third time")
testConfigMapThirdUpdate, err := updateConfigMap(c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
setConfigMapData(cm, "mutation", "3")
})
Expect(err).NotTo(HaveOccurred(), "failed to update configmap %s in namespace %s a third time", configMapName, ns)
By("deleting the configmap")
err = c.CoreV1().ConfigMaps(ns).Delete(testConfigMap.GetName(), nil)
Expect(err).NotTo(HaveOccurred(), "failed to delete configmap %s in namespace: %s", configMapName, ns)
By("Expecting to observe an add notification for the watched object when the label value was restored")
expectEvent(testWatch, watch.Added, testConfigMapLabelRestored)
expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate)
expectEvent(testWatch, watch.Deleted, nil)
})
/*
Testname: watch-consistency
Description: Ensure that concurrent watches are consistent with each other by initiating an additional watch
for events received from the first watch, initiated at the resource version of the event, and checking that all
resource versions of all events match. Events are produced from writes on a background goroutine.
*/
It("should receive events on concurrent watches in same order", func() {
c := f.ClientSet
ns := f.Namespace.Name
iterations := 100
By("starting a background goroutine to produce watch events")
donec := make(chan struct{})
stopc := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(donec)
produceConfigMapEvents(f, stopc, 5*time.Millisecond)
}()
By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
wcs := []watch.Interface{}
resourceVersion := "0"
for i := 0; i < iterations; i++ {
wc, err := c.CoreV1().ConfigMaps(ns).Watch(metav1.ListOptions{ResourceVersion: resourceVersion})
Expect(err).NotTo(HaveOccurred(), "Failed to watch configmaps in the namespace %s", ns)
wcs = append(wcs, wc)
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
for _, wc := range wcs[1:] {
e := waitForNextConfigMapEvent(wc)
if resourceVersion != e.ResourceVersion {
framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion)
}
}
}
close(stopc)
for _, wc := range wcs {
wc.Stop()
}
<-donec
})
})
func watchConfigMaps(f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) {
c := f.ClientSet
ns := f.Namespace.Name
opts := metav1.ListOptions{
ResourceVersion: resourceVersion,
LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: watchConfigMapLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: labels,
},
},
}),
}
return c.CoreV1().ConfigMaps(ns).Watch(opts)
}
func int64ptr(i int) *int64 {
i64 := int64(i)
return &i64
}
func setConfigMapData(cm *v1.ConfigMap, key, value string) {
if cm.Data == nil {
cm.Data = make(map[string]string)
}
cm.Data[key] = value
}
func expectEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
if event, ok := waitForEvent(w, eventType, object, 1*time.Minute); !ok {
framework.Failf("Timed out waiting for expected watch notification: %v", event)
}
}
func expectNoEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
if event, ok := waitForEvent(w, eventType, object, 10*time.Second); ok {
framework.Failf("Unexpected watch notification observed: %v", event)
}
}
func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object, duration time.Duration) (watch.Event, bool) {
stopTimer := time.NewTimer(duration)
defer stopTimer.Stop()
for {
select {
case actual, ok := <-w.ResultChan():
if ok {
framework.Logf("Got : %v %v", actual.Type, actual.Object)
} else {
framework.Failf("Watch closed unexpectedly")
}
if expectType == actual.Type && (expectObject == nil || apiequality.Semantic.DeepEqual(expectObject, actual.Object)) {
return actual, true
}
case <-stopTimer.C:
expected := watch.Event{
Type: expectType,
Object: expectObject,
}
return expected, false
}
}
}
func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
select {
case event := <-watch.ResultChan():
if configMap, ok := event.Object.(*v1.ConfigMap); ok {
return configMap
} else {
framework.Failf("expected config map")
}
case <-time.After(10 * time.Second):
framework.Failf("timed out waiting for watch event")
}
return nil // should never happen
}
const (
createEvent = iota
updateEvent
deleteEvent
)
func produceConfigMapEvents(f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) {
c := f.ClientSet
ns := f.Namespace.Name
name := func(i int) string {
return fmt.Sprintf("cm-%d", i)
}
existing := []int{}
tc := time.NewTicker(minWaitBetweenEvents)
defer tc.Stop()
i := 0
for range tc.C {
op := rand.Intn(3)
if len(existing) == 0 {
op = createEvent
}
cm := &v1.ConfigMap{}
switch op {
case createEvent:
cm.Name = name(i)
_, err := c.CoreV1().ConfigMaps(ns).Create(cm)
Expect(err).NotTo(HaveOccurred(), "Failed to create configmap %s in namespace %s", cm.Name, ns)
existing = append(existing, i)
i += 1
case updateEvent:
idx := rand.Intn(len(existing))
cm.Name = name(existing[idx])
_, err := c.CoreV1().ConfigMaps(ns).Update(cm)
Expect(err).NotTo(HaveOccurred(), "Failed to update configmap %s in namespace %s", cm.Name, ns)
case deleteEvent:
idx := rand.Intn(len(existing))
err := c.CoreV1().ConfigMaps(ns).Delete(name(existing[idx]), &metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred(), "Failed to delete configmap %s in namespace %s", name(existing[idx]), ns)
existing = append(existing[:idx], existing[idx+1:]...)
default:
framework.Failf("Unsupported event operation: %d", op)
}
select {
case <-stopc:
return
default:
}
}
}
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.14.1

搜索帮助