Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions test/e2e/apimachinery/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework"

"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -336,6 +338,11 @@ var _ = SIGDescribe("Watchers", func() {

iterations := 100

ginkgo.By("getting a starting resourceVersion")
configmaps, err := c.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
resourceVersion := configmaps.ResourceVersion

ginkgo.By("starting a background goroutine to produce watch events")
donec := make(chan struct{})
stopc := make(chan struct{})
Expand All @@ -345,11 +352,16 @@ var _ = SIGDescribe("Watchers", func() {
produceConfigMapEvents(f, stopc, 5*time.Millisecond)
}()

listWatcher := &cachetools.ListWatch{
WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
return c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), listOptions)
},
}

ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
wcs := []watch.Interface{}
resourceVersion := "0"
for i := 0; i < iterations; i++ {
wc, err := c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), metav1.ListOptions{ResourceVersion: resourceVersion})
wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
wcs = append(wcs, wc)
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
Expand Down Expand Up @@ -436,11 +448,14 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru

func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
select {
case event := <-watch.ResultChan():
case event, ok := <-watch.ResultChan():
if !ok {
framework.Failf("Watch closed unexpectedly")
}
if configMap, ok := event.Object.(*v1.ConfigMap); ok {
return configMap
}
framework.Failf("expected config map")
framework.Failf("expected config map, got %T", event.Object)
case <-time.After(10 * time.Second):
framework.Failf("timed out waiting for watch event")
}
Expand Down