From 7b60362bc6e9fc5ee8f121f30dd86905247578af Mon Sep 17 00:00:00 2001 From: Nimrod Shneor Date: Sun, 24 Jan 2021 10:51:54 +0200 Subject: [PATCH] Add support for implicit paging in un/structured clients --- pkg/client/client_test.go | 48 +++++++++++++++++++++++++++ pkg/client/interfaces.go | 7 ++++ pkg/client/options.go | 6 ++++ pkg/client/typed_client.go | 67 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 2dc152b164..8e7ac5e545 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -19,6 +19,7 @@ package client_test import ( "context" "fmt" + "strconv" "sync/atomic" "time" @@ -47,6 +48,14 @@ func deleteDeployment(ctx context.Context, dep *appsv1.Deployment, ns string) { } } +func deletePod(ctx context.Context, pod *corev1.Pod, ns string) { + _, err := clientset.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{}) + if err == nil { + err = clientset.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + } +} + func deleteNamespace(ctx context.Context, ns *corev1.Namespace) { ns, err := clientset.CoreV1().Namespaces().Get(ctx, ns.Name, metav1.GetOptions{}) if err != nil { @@ -2085,6 +2094,45 @@ var _ = Describe("Client", func() { Expect(deps.Items[1].Name).To(Equal(dep4.Name)) }, serverSideTimeoutSeconds) + It("should list in pages large sets of objects using ListPages", func() { + buildPod := func(suffix string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%s", suffix), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}, + }, + } + } + + By("creating 150 pods") + workLoad := 150 + for workLoad > 0 { + pod := buildPod(strconv.Itoa(workLoad)) + defer deletePod(ctx, pod, ns) + pod, err := clientset. + CoreV1(). + Pods(ns). + Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + workLoad-- + defer deletePod(ctx, pod, ns) + } + + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("listing all pods with ListPages") + pods := &corev1.PodList{} + err = cl.ListPages(context.Background(), pods, func(obj client.ObjectList) error { return nil }) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(150)) + Expect(pods.Continue).NotTo(BeEmpty()) + Expect(pods.Items[0].Name).To(Equal("pod-1")) + + }, serverSideTimeoutSeconds) + PIt("should fail if the object doesn't have meta", func() { }) diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 09636968f1..999bbe4800 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -55,6 +55,13 @@ type Reader interface { // successful call, Items field in the list will be populated with the // result returned from the server. List(ctx context.Context, list ObjectList, opts ...ListOption) error + + // Retrieves a list of objects in "chunks" (of size one hundred by default) + // for a given namespace and list options. + // One can pass a callback function to process each chunk recieved from the server. + // On a successful call, Items field in the list will be populated with the + // result returned from the server. + ListPages(ctx context.Context, obj ObjectList, callback func(obj ObjectList) error, opts ...ListOption) error } // Writer knows how to create, delete, and update Kubernetes objects. diff --git a/pkg/client/options.go b/pkg/client/options.go index f253276466..18e8811c6f 100644 --- a/pkg/client/options.go +++ b/pkg/client/options.go @@ -23,6 +23,12 @@ import ( "k8s.io/apimachinery/pkg/selection" ) +const ( + // DefaultPageLimit represents the default limit used for ListPaging when no "Limit" + // is specified as ListOption. + DefaultPageLimit = 100 +) + // {{{ "Functional" Option Interfaces // CreateOption is some configuration that modifies options for a create request. diff --git a/pkg/client/typed_client.go b/pkg/client/typed_client.go index a1b32653ca..b3a0438579 100644 --- a/pkg/client/typed_client.go +++ b/pkg/client/typed_client.go @@ -19,6 +19,7 @@ package client import ( "context" + apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" ) @@ -143,6 +144,72 @@ func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj Object) error Name(key.Name).Do(ctx).Into(obj) } +func (c *typedClient) ListPages(ctx context.Context, obj ObjectList, + callback func(obj ObjectList) error, opts ...ListOption) error { + r, err := c.cache.getResource(obj) + if err != nil { + return err + } + listOpts := ListOptions{} + listOpts.ApplyOptions(opts) + + // Fetch items at chunks of one hundred if not specified differently. + if listOpts.Limit == 0 { + Limit(DefaultPageLimit).ApplyToList(&listOpts) + } + + // Retrieve initial chunck of data. + var allItems []runtime.Object + var interimResult ObjectList + err = r.Get(). + NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()). + Resource(r.resource()). + VersionedParams(listOpts.AsListOptions(), c.paramCodec). + Do(ctx).Into(interimResult) + if err != nil { + return err + } + + if err := callback(interimResult); err != nil { + return err + } + + items, err := apimeta.ExtractList(interimResult) + if err != nil { + return err + } + allItems = append(allItems, items...) + + // Continue while there are more chunks. + for { + if interimResult.GetContinue() == "" { + break + } + + Continue(interimResult.GetContinue()).ApplyToList(&listOpts) + err = r.Get(). + NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()). + Resource(r.resource()). + VersionedParams(listOpts.AsListOptions(), c.paramCodec). + Do(ctx).Into(interimResult) + if err != nil { + return err + } + + if err := callback(interimResult); err != nil { + return err + } + + items, err = apimeta.ExtractList(interimResult) + if err != nil { + return err + } + allItems = append(allItems, items...) + } + + return apimeta.SetList(obj, allItems) +} + // List implements client.Client func (c *typedClient) List(ctx context.Context, obj ObjectList, opts ...ListOption) error { r, err := c.cache.getResource(obj)