Skip to content

Commit

Permalink
✨ Allow non-blocking retrieval of informers (#2371)
Browse files Browse the repository at this point in the history
* Allow non-blocking retrieval of informers

Signed-off-by: Max Smythe <[email protected]>

Re-organize functional arguments

Signed-off-by: Max Smythe <[email protected]>

Add unit tests

Signed-off-by: Max Smythe <[email protected]>

Add deferred cancel call to test

Signed-off-by: Max Smythe <[email protected]>

Run gofmt

Signed-off-by: Max Smythe <[email protected]>

* Update pkg/cache/internal/informers.go

Co-authored-by: Stefan Büringer <[email protected]>

* Update pkg/cache/internal/informers.go

Co-authored-by: Stefan Büringer <[email protected]>

* Alias functional options

* Use private option for newInformer override

* Fix lint errors

---------

Signed-off-by: Max Smythe <[email protected]>
Co-authored-by: Stefan Büringer <[email protected]>
  • Loading branch information
maxsmythe and sbueringer authored Aug 22, 2023
1 parent 304027b commit c20ea14
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 27 deletions.
6 changes: 3 additions & 3 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,15 @@ type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(ctx, obj)
return c.Cache.GetInformer(ctx, obj, opts...)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

Expand Down
22 changes: 20 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ var (
defaultSyncPeriod = 10 * time.Hour
)

// InformerGetOptions defines the behavior of how informers are retrieved.
type InformerGetOptions internal.GetOptions

// InformerGetOption defines an option that alters the behavior of how informers are retrieved.
type InformerGetOption func(*InformerGetOptions)

// BlockUntilSynced determines whether a get request for an informer should block
// until the informer's cache has synced.
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
return func(opts *InformerGetOptions) {
opts.BlockUntilSynced = &shouldBlock
}
}

// Cache knows how to load Kubernetes objects, fetch informers to request
// to receive events for Kubernetes objects (at a low-level),
// and add indices to fields on the objects stored in the cache.
Expand All @@ -61,11 +75,11 @@ type Cache interface {
type Informers interface {
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
// API kind and resource.
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error)

// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)

// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Expand Down Expand Up @@ -187,6 +201,9 @@ type Options struct {
// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
// object, this will fall through to Default* settings.
ByObject map[client.Object]ByObject

// newInformer allows overriding of NewSharedIndexInformer for testing.
newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer
}

// ByObject offers more fine-grained control over the cache's ListWatch by object.
Expand Down Expand Up @@ -337,6 +354,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
},
Transform: config.Transform,
UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
NewInformer: opts.newInformer,
}),
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
}
Expand Down
91 changes: 90 additions & 1 deletion pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -43,6 +44,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
)

const testNodeOne = "test-node-1"
Expand Down Expand Up @@ -117,6 +119,7 @@ func deletePod(pod client.Object) {

var _ = Describe("Informer Cache", func() {
CacheTest(cache.New, cache.Options{})
NonBlockingGetTest(cache.New, cache.Options{})
})

var _ = Describe("Informer Cache with ReaderFailOnMissingInformer", func() {
Expand All @@ -131,12 +134,22 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
"default": {},
},
})
NonBlockingGetTest(cache.New, cache.Options{
DefaultNamespaces: map[string]cache.Config{
testNamespaceOne: {},
testNamespaceTwo: {},
"default": {},
},
})
})

var _ = Describe("Informer Cache without global DeepCopy", func() {
CacheTest(cache.New, cache.Options{
DefaultUnsafeDisableDeepCopy: pointer.Bool(true),
})
NonBlockingGetTest(cache.New, cache.Options{
DefaultUnsafeDisableDeepCopy: pointer.Bool(true),
})
})

var _ = Describe("Cache with transformers", func() {
Expand Down Expand Up @@ -440,7 +453,6 @@ func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Conf
BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating the informer cache")
var err error
informerCache, err = createCacheFunc(cfg, opts)
Expand Down Expand Up @@ -507,6 +519,83 @@ func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Conf
})
}

func NonBlockingGetTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
Describe("non-blocking get test", func() {
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
)
BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())

By("creating expected namespaces")
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
err = ensureNode(testNodeOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceTwo, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceThree, cl)
Expect(err).NotTo(HaveOccurred())

By("creating the informer cache")
v := reflect.ValueOf(&opts).Elem()
newInformerField := v.FieldByName("newInformer")
newFakeInformer := func(_ kcache.ListerWatcher, _ runtime.Object, _ time.Duration, _ kcache.Indexers) kcache.SharedIndexInformer {
return &controllertest.FakeInformer{Synced: false}
}
reflect.NewAt(newInformerField.Type(), newInformerField.Addr().UnsafePointer()).
Elem().
Set(reflect.ValueOf(&newFakeInformer))
informerCache, err = createCacheFunc(cfg, opts)
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})

AfterEach(func() {
By("cleaning up created pods")
informerCacheCancel()
})

Describe("as an Informer", func() {
It("should be able to get informer for the object without blocking", func() {
By("getting a shared index informer for a pod")
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "informer-obj",
Namespace: "default",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
sii, err := informerCache.GetInformer(ctx, pod, cache.BlockUntilSynced(false))
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeFalse())
})
})
})
}

func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) {
Describe("Cache test", func() {
var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/delegating_by_gvk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
return cache.List(ctx, list, opts...)
}

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return nil, err
}
return cache.GetInformer(ctx, obj)
return cache.GetInformer(ctx, obj, opts...)
}

func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk)
func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...)
}

func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
Expand Down
18 changes: 13 additions & 5 deletions pkg/cache/informer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,37 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
return &gvk, cacheTypeObj, nil
}

func applyGetOptions(opts ...InformerGetOption) *internal.GetOptions {
cfg := &InformerGetOptions{}
for _, opt := range opts {
opt(cfg)
}
return (*internal.GetOptions)(cfg)
}

// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started.
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// Map the gvk to an object
obj, err := ic.scheme.New(gvk)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
if err != nil {
return nil, err
}
return i.Informer, nil
}

// GetInformer returns the informer for the obj. If no informer exists, one will be started.
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return nil, err
}

_, i, err := ic.Informers.Get(ctx, gvk, obj)
_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
if err != nil {
return nil, err
}
Expand All @@ -179,7 +187,7 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return started, cache, nil
}

return ic.Informers.Get(ctx, gvk, obj)
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type FakeInformers struct {
}

// GetInformerForKind implements Informers.
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand All @@ -61,7 +61,7 @@ func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.Grou
}

// GetInformer implements Informers.
func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
if c.Scheme == nil {
c.Scheme = scheme.Scheme
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/cache/internal/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ type InformersOpts struct {
Mapper meta.RESTMapper
ResyncPeriod time.Duration
Namespace string
NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
Selector Selector
Transform cache.TransformFunc
UnsafeDisableDeepCopy bool
}

// NewInformers creates a new InformersMap that can create informers under the hood.
func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
newInformer := cache.NewSharedIndexInformer
if options.NewInformer != nil {
newInformer = *options.NewInformer
}
return &Informers{
config: config,
httpClient: options.HTTPClient,
Expand All @@ -70,6 +75,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
selector: options.Selector,
transform: options.Transform,
unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
newInformer: newInformer,
}
}

Expand All @@ -88,6 +94,13 @@ type tracker struct {
Metadata map[schema.GroupVersionKind]*Cache
}

// GetOptions provides configuration to customize the behavior when
// getting an informer.
type GetOptions struct {
// BlockUntilSynced controls if the informer retrieval will block until the informer is synced. Defaults to `true`.
BlockUntilSynced *bool
}

// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type Informers struct {
Expand Down Expand Up @@ -143,6 +156,9 @@ type Informers struct {
selector Selector
transform cache.TransformFunc
unsafeDisableDeepCopy bool

// NewInformer allows overriding of the shared index informer constructor for testing.
newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -240,7 +256,7 @@ func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res

// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
// Return the informer if it is found
i, started, ok := ip.Peek(gvk, obj)
if !ok {
Expand All @@ -250,7 +266,12 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
}
}

if started && !i.Informer.HasSynced() {
shouldBlock := true
if opts.BlockUntilSynced != nil {
shouldBlock = *opts.BlockUntilSynced
}

if shouldBlock && started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
Expand Down Expand Up @@ -288,7 +309,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
if err != nil {
return nil, false, err
}
sharedIndexInformer := cache.NewSharedIndexInformer(&cache.ListWatch{
sharedIndexInformer := ip.newInformer(&cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selector.ApplyToList(&opts)
return listWatcher.ListFunc(opts)
Expand Down
Loading

0 comments on commit c20ea14

Please sign in to comment.