diff --git a/designs/move-cluster-specific-code-out-of-manager.md b/designs/move-cluster-specific-code-out-of-manager.md index c01d796900..8bd7efebc3 100644 --- a/designs/move-cluster-specific-code-out-of-manager.md +++ b/designs/move-cluster-specific-code-out-of-manager.md @@ -203,7 +203,7 @@ func NewSecretMirrorReconciler(mgr manager.Manager, mirrorCluster cluster.Cluste func main(){ - mgr, err := manager.New(cfg1, manager.Options{}) + mgr, err := manager.New(context.Background(), cfg1, manager.Options{}) if err != nil { panic(err) } diff --git a/example_test.go b/example_test.go index 0117ba02c0..9e529db18b 100644 --- a/example_test.go +++ b/example_test.go @@ -38,7 +38,7 @@ import ( func Example() { var log = controllers.Log.WithName("builder-examples") - manager, err := controllers.NewManager(controllers.GetConfigOrDie(), controllers.Options{}) + manager, err := controllers.NewManager(context.Background(), controllers.GetConfigOrDie(), controllers.Options{}) if err != nil { log.Error(err, "could not create manager") os.Exit(1) @@ -78,11 +78,14 @@ func Example_updateLeaderElectionDurations() { leaseDuration := 100 * time.Second renewDeadline := 80 * time.Second retryPeriod := 20 * time.Second - manager, err := controllers.NewManager(controllers.GetConfigOrDie(), controllers.Options{ - LeaseDuration: &leaseDuration, - RenewDeadline: &renewDeadline, - RetryPeriod: &retryPeriod, - }) + manager, err := controllers.NewManager( + context.Background(), + controllers.GetConfigOrDie(), + controllers.Options{ + LeaseDuration: &leaseDuration, + RenewDeadline: &renewDeadline, + RetryPeriod: &retryPeriod, + }) if err != nil { log.Error(err, "could not create manager") os.Exit(1) diff --git a/examples/builtins/main.go b/examples/builtins/main.go index ff1f0dfa3b..2dafd6906f 100644 --- a/examples/builtins/main.go +++ b/examples/builtins/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "os" appsv1 "k8s.io/api/apps/v1" @@ -42,7 +43,7 @@ func main() { // Setup a Manager entryLog.Info("setting up manager") - mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{}) if err != nil { entryLog.Error(err, "unable to set up overall controller manager") os.Exit(1) diff --git a/examples/crd/main.go b/examples/crd/main.go index 1f6cd5fac2..058dc463bc 100644 --- a/examples/crd/main.go +++ b/examples/crd/main.go @@ -104,7 +104,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu func main() { ctrl.SetLogger(zap.New()) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) + mgr, err := ctrl.NewManager(context.Background(), ctrl.GetConfigOrDie(), ctrl.Options{}) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 6050d008ca..2b6a49789f 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -79,7 +79,7 @@ var _ = Describe("application", func() { Describe("New", func() { It("should return success if given valid objects", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -92,7 +92,7 @@ var _ = Describe("application", func() { It("should return error if given two apiType objects in For function", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -106,7 +106,7 @@ var _ = Describe("application", func() { It("should return an error if For function is not called", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -118,7 +118,7 @@ var _ = Describe("application", func() { It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("creating a controller with a bad For type") @@ -141,7 +141,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -164,7 +164,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -186,7 +186,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -209,7 +209,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -230,7 +230,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -244,7 +244,7 @@ var _ = Describe("application", func() { It("should allow multiple controllers for the same kind", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -273,7 +273,7 @@ var _ = Describe("application", func() { Describe("Start with ControllerManagedBy", func() { It("should Reconcile Owns objects", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) bldr := ControllerManagedBy(m). @@ -284,7 +284,7 @@ var _ = Describe("application", func() { }, 10) It("should Reconcile Watches objects", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) bldr := ControllerManagedBy(m). @@ -299,7 +299,7 @@ var _ = Describe("application", func() { Describe("Set custom predicates", func() { It("should execute registered predicates only for assigned kind", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) var ( diff --git a/pkg/builder/example_test.go b/pkg/builder/example_test.go index 8dd7249516..207867d470 100644 --- a/pkg/builder/example_test.go +++ b/pkg/builder/example_test.go @@ -45,7 +45,7 @@ func ExampleBuilder() { var log = logf.Log.WithName("builder-examples") - mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{}) if err != nil { log.Error(err, "could not create manager") os.Exit(1) diff --git a/pkg/builder/example_webhook_test.go b/pkg/builder/example_webhook_test.go index 63333a2478..f2dacd4dbc 100644 --- a/pkg/builder/example_webhook_test.go +++ b/pkg/builder/example_webhook_test.go @@ -17,6 +17,7 @@ limitations under the License. package builder_test import ( + "context" "os" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -39,7 +40,7 @@ var _ admission.Validator = &examplegroup.ChaosPod{} func ExampleWebhookBuilder() { var log = logf.Log.WithName("webhookbuilder-example") - mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{}) if err != nil { log.Error(err, "could not create manager") os.Exit(1) diff --git a/pkg/builder/webhook_test.go b/pkg/builder/webhook_test.go index dc2ad9d7bc..102d9f6d7b 100644 --- a/pkg/builder/webhook_test.go +++ b/pkg/builder/webhook_test.go @@ -17,6 +17,7 @@ limitations under the License. package builder import ( + "context" "errors" "fmt" "net/http" @@ -49,7 +50,7 @@ var _ = Describe("webhook", func() { Describe("New", func() { It("should scaffold a defaulting webhook if the type implements the Defaulter interface", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -89,11 +90,11 @@ var _ = Describe("webhook", func() { } }`) - stopCh := make(chan struct{}) - close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // TODO: we may want to improve it to make it be able to inject dependencies, // but not always try to load certs and return not found error. - err = svr.Start(stopCh) + err = svr.Start(ctx) if err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } @@ -121,7 +122,7 @@ var _ = Describe("webhook", func() { It("should scaffold a validating webhook if the type implements the Validator interface", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -163,11 +164,11 @@ var _ = Describe("webhook", func() { } }`) - stopCh := make(chan struct{}) - close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // TODO: we may want to improve it to make it be able to inject dependencies, // but not always try to load certs and return not found error. - err = svr.Start(stopCh) + err = svr.Start(ctx) if err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } @@ -194,7 +195,7 @@ var _ = Describe("webhook", func() { It("should scaffold defaulting and validating webhooks if the type implements both Defaulter and Validator interfaces", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -234,11 +235,11 @@ var _ = Describe("webhook", func() { } }`) - stopCh := make(chan struct{}) - close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // TODO: we may want to improve it to make it be able to inject dependencies, // but not always try to load certs and return not found error. - err = svr.Start(stopCh) + err = svr.Start(ctx) if err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } @@ -269,7 +270,9 @@ var _ = Describe("webhook", func() { It("should scaffold a validating webhook if the type implements the Validator interface to validate deletes", func() { By("creating a controller manager") - m, err := manager.New(cfg, manager.Options{}) + ctx, cancel := context.WithCancel(context.Background()) + + m, err := manager.New(ctx, cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -308,11 +311,11 @@ var _ = Describe("webhook", func() { } } }`) - stopCh := make(chan struct{}) - close(stopCh) + + cancel() // TODO: we may want to improve it to make it be able to inject dependencies, // but not always try to load certs and return not found error. - err = svr.Start(stopCh) + err = svr.Start(ctx) if err != nil && !os.IsNotExist(err) { Expect(err).NotTo(HaveOccurred()) } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 0d7bdd849e..3fcef4f592 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -58,12 +58,12 @@ type Informers interface { // of the underlying object. GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) - // Start runs all the informers known to this cache until the given channel is closed. + // Start runs all the informers known to this cache until the context is closed. // It blocks. - Start(stopCh <-chan struct{}) error + Start(ctx context.Context) error // WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache. - WaitForCacheSync(stop <-chan struct{}) bool + WaitForCacheSync(ctx context.Context) bool // Informers knows how to add indices to the caches (informers) that it manages. client.FieldIndexer diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 440b5d2fb0..bdcf458c16 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -83,16 +83,17 @@ var _ = Describe("Multi-Namespace Informer Cache", func() { func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error)) { Describe("Cache test", func() { var ( - informerCache cache.Cache - stop chan struct{} - knownPod1 runtime.Object - knownPod2 runtime.Object - knownPod3 runtime.Object - knownPod4 runtime.Object + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + knownPod1 runtime.Object + knownPod2 runtime.Object + knownPod3 runtime.Object + knownPod4 runtime.Object ) BeforeEach(func() { - stop = make(chan struct{}) + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) Expect(cfg).NotTo(BeNil()) By("creating three pods") @@ -123,11 +124,11 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Expect(err).NotTo(HaveOccurred()) By("running the cache and waiting for it to sync") // pass as an arg so that we don't race between close and re-assign - go func(stopCh chan struct{}) { + go func(ctx context.Context) { defer GinkgoRecover() - Expect(informerCache.Start(stopCh)).To(Succeed()) - }(stop) - Expect(informerCache.WaitForCacheSync(stop)).To(BeTrue()) + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) }) AfterEach(func() { @@ -137,7 +138,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca deletePod(knownPod3) deletePod(knownPod4) - close(stop) + informerCacheCancel() }) Describe("as a Reader", func() { @@ -270,13 +271,12 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca }) It("should return an error when context is cancelled", func() { - By("creating a context and cancelling it") - ctx, cancel := context.WithCancel(context.Background()) - cancel() + By("cancelling the context") + informerCacheCancel() By("listing pods in test-namespace-1 with a cancelled context") listObj := &kcorev1.PodList{} - err := informerCache.List(ctx, listObj, client.InNamespace(testNamespaceOne)) + err := informerCache.List(informerCacheCtx, listObj, client.InNamespace(testNamespaceOne)) By("verifying that an error is returned") Expect(err).To(HaveOccurred()) @@ -396,9 +396,9 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca By("running the cache and waiting for it to sync") go func() { defer GinkgoRecover() - Expect(namespacedCache.Start(stop)).To(Succeed()) + Expect(namespacedCache.Start(informerCacheCtx)).To(Succeed()) }() - Expect(namespacedCache.WaitForCacheSync(stop)).NotTo(BeFalse()) + Expect(namespacedCache.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse()) By("listing pods in all namespaces") out := &unstructured.UnstructuredList{} @@ -574,9 +574,9 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca By("running the cache and waiting for it to sync") go func() { defer GinkgoRecover() - Expect(informer.Start(stop)).To(Succeed()) + Expect(informer.Start(informerCacheCtx)).To(Succeed()) }() - Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) + Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse()) By("listing Pods with restartPolicyOnFailure") listObj := &kcorev1.PodList{} @@ -591,8 +591,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca It("should allow for get informer to be cancelled", func() { By("creating a context and cancelling it") - ctx, cancel := context.WithCancel(context.Background()) - cancel() + informerCacheCancel() By("getting a shared index informer for a pod with a cancelled context") pod := &kcorev1.Pod{ @@ -609,7 +608,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca }, }, } - sii, err := informerCache.GetInformer(ctx, pod) + sii, err := informerCache.GetInformer(informerCacheCtx, pod) Expect(err).To(HaveOccurred()) Expect(sii).To(BeNil()) Expect(errors.IsTimeout(err)).To(BeTrue()) @@ -617,12 +616,11 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca It("should allow getting an informer by group/version/kind to be cancelled", func() { By("creating a context and cancelling it") - ctx, cancel := context.WithCancel(context.Background()) - cancel() + informerCacheCancel() By("getting an shared index informer for gvk = core/v1/pod with a cancelled context") gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} - sii, err := informerCache.GetInformerForKind(ctx, gvk) + sii, err := informerCache.GetInformerForKind(informerCacheCtx, gvk) Expect(err).To(HaveOccurred()) Expect(sii).To(BeNil()) Expect(errors.IsTimeout(err)).To(BeTrue()) @@ -636,7 +634,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Object: map[string]interface{}{ "spec": map[string]interface{}{ "containers": []map[string]interface{}{ - map[string]interface{}{ + { "name": "nginx", "image": "nginx", }, @@ -702,9 +700,9 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca By("running the cache and waiting for it to sync") go func() { defer GinkgoRecover() - Expect(informer.Start(stop)).To(Succeed()) + Expect(informer.Start(informerCacheCtx)).To(Succeed()) }() - Expect(informer.WaitForCacheSync(stop)).NotTo(BeFalse()) + Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse()) By("listing Pods with restartPolicyOnFailure") listObj := &unstructured.UnstructuredList{} @@ -725,9 +723,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca }, 3) It("should allow for get informer to be cancelled", func() { - By("creating a context and cancelling it") - ctx, cancel := context.WithCancel(context.Background()) - cancel() + By("cancelling the context") + informerCacheCancel() By("getting a shared index informer for a pod with a cancelled context") pod := &unstructured.Unstructured{} @@ -738,7 +735,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Version: "v1", Kind: "Pod", }) - sii, err := informerCache.GetInformer(ctx, pod) + sii, err := informerCache.GetInformer(informerCacheCtx, pod) Expect(err).To(HaveOccurred()) Expect(sii).To(BeNil()) Expect(errors.IsTimeout(err)).To(BeTrue()) diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index e686208296..4e7bb5bc7f 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -80,7 +80,7 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj runtime.Object) (ca } // WaitForCacheSync implements Informers -func (c *FakeInformers) WaitForCacheSync(stop <-chan struct{}) bool { +func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool { if c.Synced == nil { return true } @@ -121,7 +121,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec } // Start implements Informers -func (c *FakeInformers) Start(stopCh <-chan struct{}) error { +func (c *FakeInformers) Start(ctx context.Context) error { return c.Error } diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index cdaf1fc21c..a7ee643482 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -56,26 +56,26 @@ func NewInformersMap(config *rest.Config, } } -// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel. -func (m *InformersMap) Start(stop <-chan struct{}) error { - go m.structured.Start(stop) - go m.unstructured.Start(stop) - <-stop +// Start calls Run on each of the informers and sets started to true. Blocks on the context. +func (m *InformersMap) Start(ctx context.Context) error { + go m.structured.Start(ctx) + go m.unstructured.Start(ctx) + <-ctx.Done() return nil } // WaitForCacheSync waits until all the caches have been started and synced. -func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool { +func (m *InformersMap) WaitForCacheSync(ctx context.Context) bool { syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...) syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...) - if !m.structured.waitForStarted(stop) { + if !m.structured.waitForStarted(ctx) { return false } - if !m.unstructured.waitForStarted(stop) { + if !m.unstructured.waitForStarted(ctx) { return false } - return cache.WaitForCacheSync(stop, syncedFuncs...) + return cache.WaitForCacheSync(ctx.Done(), syncedFuncs...) } // Get will create a new Informer and add it to the map of InformersMap if none exists. Returns diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 1068a17383..bc450d8781 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -121,33 +121,33 @@ type specificInformersMap struct { namespace string } -// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel. +// Start calls Run on each of the informers and sets started to true. Blocks on the context. // It doesn't return start because it can't return an error, and it's not a runnable directly. -func (ip *specificInformersMap) Start(stop <-chan struct{}) { +func (ip *specificInformersMap) Start(ctx context.Context) { func() { ip.mu.Lock() defer ip.mu.Unlock() // Set the stop channel so it can be passed to informers that are added later - ip.stop = stop + ip.stop = ctx.Done() // Start each informer for _, informer := range ip.informersByGVK { - go informer.Informer.Run(stop) + go informer.Informer.Run(ctx.Done()) } // Set started to true so we immediately start any informers added later. ip.started = true close(ip.startWait) }() - <-stop + <-ctx.Done() } -func (ip *specificInformersMap) waitForStarted(stop <-chan struct{}) bool { +func (ip *specificInformersMap) waitForStarted(ctx context.Context) bool { select { case <-ip.startWait: return true - case <-stop: + case <-ctx.Done(): return false } } diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 175437d9be..ec5dd1fe5b 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -94,23 +94,23 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema return &multiNamespaceInformer{namespaceToInformer: informers}, nil } -func (c *multiNamespaceCache) Start(stopCh <-chan struct{}) error { +func (c *multiNamespaceCache) Start(ctx context.Context) error { for ns, cache := range c.namespaceToCache { go func(ns string, cache Cache) { - err := cache.Start(stopCh) + err := cache.Start(ctx) if err != nil { log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns) } }(ns, cache) } - <-stopCh + <-ctx.Done() return nil } -func (c *multiNamespaceCache) WaitForCacheSync(stop <-chan struct{}) bool { +func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool { synced := true for _, cache := range c.namespaceToCache { - if s := cache.WaitForCacheSync(stop); !s { + if s := cache.WaitForCacheSync(ctx); !s { synced = s } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4af40366f8..cfd50b1087 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "fmt" "github.com/go-logr/logr" @@ -64,9 +65,9 @@ type Controller interface { // EventHandler if all provided Predicates evaluate to true. Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error - // Start starts the controller. Start blocks until stop is closed or a + // Start starts the controller. Start blocks until the context is closed or a // controller has an error starting. - Start(stop <-chan struct{}) error + Start(ctx context.Context) error } // New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 91600cb617..0f31c33fb7 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -56,7 +56,7 @@ var _ = Describe("controller", func() { It("should reconcile", func(done Done) { By("Creating the Manager") - cm, err := manager.New(cfg, manager.Options{}) + cm, err := manager.New(ctx, cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") @@ -78,9 +78,9 @@ var _ = Describe("controller", func() { err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}) Expect(err).NotTo(HaveOccurred()) - err = cm.GetClient().Get(context.Background(), types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) + err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{}) Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) - err = cm.GetClient().List(context.Background(), &corev1.NamespaceList{}) + err = cm.GetClient().List(ctx, &corev1.NamespaceList{}) Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) By("Starting the Manager") diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 04c484e575..cf37a989f5 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -32,22 +32,13 @@ import ( ) var _ = Describe("controller.Controller", func() { - var stop chan struct{} - rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { return reconcile.Result{}, nil }) - BeforeEach(func() { - stop = make(chan struct{}) - }) - - AfterEach(func() { - close(stop) - }) Describe("New", func() { It("should return an error if Name is not Specified", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("", m, controller.Options{Reconciler: rec}) Expect(c).To(BeNil()) @@ -57,7 +48,7 @@ var _ = Describe("controller.Controller", func() { }) It("should return an error if Reconciler is not Specified", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("foo", m, controller.Options{}) @@ -68,7 +59,7 @@ var _ = Describe("controller.Controller", func() { }) It("NewController should return an error if injecting Reconciler fails", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("foo", m, controller.Options{Reconciler: &failRec{}}) @@ -80,7 +71,7 @@ var _ = Describe("controller.Controller", func() { }) It("should not return an error if two controllers are registered with different names", func(done Done) { - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c1, err := controller.New("c1", m, controller.Options{Reconciler: rec}) @@ -97,7 +88,7 @@ var _ = Describe("controller.Controller", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) _, err = controller.New("new-controller", m, controller.Options{Reconciler: rec}) @@ -117,7 +108,7 @@ var _ = Describe("controller.Controller", func() { It("should not create goroutines if never started", func() { currentGRs := goleak.IgnoreCurrent() - m, err := manager.New(cfg, manager.Options{}) + m, err := manager.New(context.Background(), cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) _, err = controller.New("new-controller", m, controller.Options{Reconciler: rec}) diff --git a/pkg/controller/example_test.go b/pkg/controller/example_test.go index 4603e56940..3d8e399703 100644 --- a/pkg/controller/example_test.go +++ b/pkg/controller/example_test.go @@ -144,9 +144,7 @@ func ExampleNewUnmanaged() { os.Exit(1) } - // Create a stop channel for our controller. The controller will stop when - // this channel is closed. - stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) // Start our controller in a goroutine so that we do not block. go func() { @@ -155,13 +153,13 @@ func ExampleNewUnmanaged() { // to handle that. <-mgr.Elected() - // Start our controller. This will block until the stop channel is + // Start our controller. This will block until the context is // closed, or the controller returns an error. - if err := c.Start(stop); err != nil { + if err := c.Start(ctx); err != nil { log.Error(err, "cannot run experiment controller") } }() // Stop our controller. - close(stop) + cancel() } diff --git a/pkg/envtest/webhook_test.go b/pkg/envtest/webhook_test.go index 03a477e3db..4a71e6bd00 100644 --- a/pkg/envtest/webhook_test.go +++ b/pkg/envtest/webhook_test.go @@ -22,7 +22,7 @@ var _ = Describe("Test", func() { Describe("Webhook", func() { It("should reject create request for webhook that rejects all requests", func(done Done) { - m, err := manager.New(env.Config, manager.Options{ + m, err := manager.New(context.Background(), env.Config, manager.Options{ Port: env.WebhookInstallOptions.LocalServingPort, Host: env.WebhookInstallOptions.LocalServingHost, CertDir: env.WebhookInstallOptions.LocalServingCertDir, @@ -31,9 +31,9 @@ var _ = Describe("Test", func() { server := m.GetWebhookServer() server.Register("/failing", &webhook.Admission{Handler: &rejectingValidator{}}) - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) go func() { - _ = server.Start(stopCh) + _ = server.Start(ctx) }() c, err := client.New(env.Config, client.Options{}) @@ -71,7 +71,7 @@ var _ = Describe("Test", func() { return errors.ReasonForError(err) == metav1.StatusReason("Always denied") }, 1*time.Second).Should(BeTrue()) - close(stopCh) + cancel() close(done) }) diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 26f20458f3..c62fc2681b 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -72,7 +72,12 @@ type Controller struct { // Started is true if the Controller has been Started Started bool - // TODO(community): Consider initializing a logger with the Controller Name as the tag + // ctx is the context that was passed to Start() and used when starting watches. + // + // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, + // while we usually always strive to follow best practices, we consider this a legacy case and it should + // undergo a major refactoring and redesign to allow for context to not be stored in a struct. + ctx context.Context // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription @@ -122,11 +127,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } c.Log.Info("Starting EventSource", "source", src) - return src.Start(evthdler, c.Queue, prct...) + return src.Start(c.ctx, evthdler, c.Queue, prct...) } // Start implements controller.Controller -func (c *Controller) Start(stop <-chan struct{}) error { +func (c *Controller) Start(ctx context.Context) error { // use an IIFE to get proper lock handling // but lock outside to get proper handling of the queue shutdown c.mu.Lock() @@ -134,6 +139,9 @@ func (c *Controller) Start(stop <-chan struct{}) error { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } + // Set the internal context. + c.ctx = ctx + c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed @@ -148,7 +156,7 @@ func (c *Controller) Start(stop <-chan struct{}) error { // caches. for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(watch.handler, c.Queue, watch.predicates...); err != nil { + if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } @@ -161,7 +169,7 @@ func (c *Controller) Start(stop <-chan struct{}) error { if !ok { continue } - if err := syncingSource.WaitForSync(stop); err != nil { + if err := syncingSource.WaitForSync(ctx); err != nil { // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) @@ -184,8 +192,12 @@ func (c *Controller) Start(stop <-chan struct{}) error { c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles)) for i := 0; i < c.MaxConcurrentReconciles; i++ { - // Process work items - go wait.Until(c.worker, c.JitterPeriod, stop) + go wait.UntilWithContext(ctx, func(ctx context.Context) { + // Run a worker thread that just dequeues items, processes them, and marks them done. + // It enforces that the reconcileHandler is never invoked concurrently with the same object. + for c.processNextWorkItem(ctx) { + } + }, c.JitterPeriod) } c.Started = true @@ -195,21 +207,14 @@ func (c *Controller) Start(stop <-chan struct{}) error { return err } - <-stop + <-ctx.Done() c.Log.Info("Stopping workers") return nil } -// worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the reconcileHandler is never invoked concurrently with the same object. -func (c *Controller) worker() { - for c.processNextWorkItem() { - } -} - // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. -func (c *Controller) processNextWorkItem() bool { +func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() if shutdown { // Stop working @@ -227,10 +232,10 @@ func (c *Controller) processNextWorkItem() bool { ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) - return c.reconcileHandler(obj) + return c.reconcileHandler(ctx, obj) } -func (c *Controller) reconcileHandler(obj interface{}) bool { +func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) bool { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { @@ -250,7 +255,7 @@ func (c *Controller) reconcileHandler(obj interface{}) bool { } log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace) - ctx := logf.IntoContext(context.Background(), log) + ctx = logf.IntoContext(ctx, log) // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the // resource to be synced. diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index dac42e69bd..10c6e34d88 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -46,14 +46,12 @@ var _ = Describe("controller", func() { var ctrl *Controller var queue *controllertest.Queue var informers *informertest.FakeInformers - var stop chan struct{} var reconciled chan reconcile.Request var request = reconcile.Request{ NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}, } BeforeEach(func() { - stop = make(chan struct{}) reconciled = make(chan reconcile.Request) fakeReconcile = &fakeReconciler{ Requests: reconciled, @@ -72,16 +70,15 @@ var _ = Describe("controller", func() { Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed()) }) - AfterEach(func() { - close(stop) - }) - Describe("Reconciler", func() { It("should call the Reconciler function", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { return reconcile.Result{Requeue: true}, nil }) - result, err := ctrl.Reconcile(context.Background(), + result, err := ctrl.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}) Expect(err).NotTo(HaveOccurred()) Expect(result).To(Equal(reconcile.Result{Requeue: true})) @@ -95,7 +92,9 @@ var _ = Describe("controller", func() { src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}), }} ctrl.Name = "foo" - err := ctrl.Start(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := ctrl.Start(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to wait for foo caches to sync")) @@ -105,10 +104,6 @@ var _ = Describe("controller", func() { It("should wait for each informer to sync", func(done Done) { // TODO(directxman12): this test doesn't do what it says it does - // Use a stopped channel so Start doesn't block - stopped := make(chan struct{}) - close(stopped) - c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) _, err = c.GetInformer(context.TODO(), &appsv1.Deployment{}) @@ -119,7 +114,10 @@ var _ = Describe("controller", func() { src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}), }} - Expect(ctrl.Start(stopped)).NotTo(HaveOccurred()) + // Use a cancelled context so Start doesn't block + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) close(done) }) @@ -129,7 +127,7 @@ var _ = Describe("controller", func() { pr2 := &predicate.Funcs{} evthdl := &handler.EnqueueRequestForObject{} started := false - src := source.Func(func(e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error { + src := source.Func(func(ctx context.Context, e handler.EventHandler, q workqueue.RateLimitingInterface, p ...predicate.Predicate) error { defer GinkgoRecover() Expect(e).To(Equal(evthdl)) Expect(q).To(Equal(ctrl.Queue)) @@ -140,16 +138,16 @@ var _ = Describe("controller", func() { }) Expect(ctrl.Watch(src, evthdl, pr1, pr2)).NotTo(HaveOccurred()) - // Use a stopped channel so Start doesn't block - stopped := make(chan struct{}) - close(stopped) - Expect(ctrl.Start(stopped)).To(Succeed()) + // Use a cancelled context so Start doesn't block + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(ctrl.Start(ctx)).To(Succeed()) Expect(started).To(BeTrue()) }) It("should return an error if there is an error starting sources", func() { err := fmt.Errorf("Expected Error: could not start source") - src := source.Func(func(handler.EventHandler, + src := source.Func(func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error { defer GinkgoRecover() @@ -157,18 +155,17 @@ var _ = Describe("controller", func() { }) Expect(ctrl.Watch(src, &handler.EnqueueRequestForObject{})).To(Succeed()) - // Use a stopped channel so Start doesn't block - stopped := make(chan struct{}) - close(stopped) - Expect(ctrl.Start(stopped)).To(Equal(err)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + Expect(ctrl.Start(ctx)).To(Equal(err)) }) It("should return an error if it gets started more than once", func() { - // Use a stopped channel so Start doesn't block - stopped := make(chan struct{}) - close(stopped) - Expect(ctrl.Start(stopped)).To(BeNil()) - err := ctrl.Start(stopped) + // Use a cancelled context so Start doesn't block + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(ctrl.Start(ctx)).To(BeNil()) + err := ctrl.Start(ctx) Expect(err).NotTo(BeNil()) Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times")) }) @@ -297,9 +294,11 @@ var _ = Describe("controller", func() { Describe("Processing queue items from a Controller", func() { It("should call Reconciler if an item is enqueued", func(done Done) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() queue.Add(request) @@ -320,9 +319,11 @@ var _ = Describe("controller", func() { Fail("Reconciler should not have been called") return reconcile.Result{}, nil }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() By("adding two bad items to the queue") @@ -344,9 +345,11 @@ var _ = Describe("controller", func() { // Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun. ctrl.JitterPeriod = time.Millisecond + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() queue.Add(request) @@ -371,9 +374,12 @@ var _ = Describe("controller", func() { It("should not reset backoff until there's a non-error result", func() { dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() dq.Add(request) @@ -404,9 +410,12 @@ var _ = Describe("controller", func() { It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() { dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() dq.Add(request) @@ -431,9 +440,12 @@ var _ = Describe("controller", func() { It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() { dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() dq.Add(request) @@ -459,9 +471,12 @@ var _ = Describe("controller", func() { dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()} ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq } ctrl.JitterPeriod = time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() dq.Add(request) @@ -511,9 +526,11 @@ var _ = Describe("controller", func() { return nil }()).Should(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will succeed") queue.Add(request) @@ -540,9 +557,11 @@ var _ = Describe("controller", func() { return nil }()).Should(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will give an error") queue.Add(request) @@ -569,9 +588,11 @@ var _ = Describe("controller", func() { return nil }()).Should(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will return result with Requeue enabled") @@ -599,9 +620,11 @@ var _ = Describe("controller", func() { return nil }()).Should(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() By("Invoking Reconciler which will return result with requeueAfter enabled") queue.Add(request) @@ -632,9 +655,11 @@ var _ = Describe("controller", func() { return nil }()).Should(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() queue.Add(request) @@ -677,9 +702,11 @@ var _ = Describe("controller", func() { return nil }()).Should(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(ctrl.Start(stop)).NotTo(HaveOccurred()) + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) }() queue.Add(request) diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index 34606be438..25b05fa1e3 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -37,7 +37,7 @@ import ( var _ = Describe("recorder", func() { var stop chan struct{} - ctx := context.TODO() + ctx := context.Background() BeforeEach(func() { stop = make(chan struct{}) @@ -51,7 +51,7 @@ var _ = Describe("recorder", func() { Describe("recorder", func() { It("should publish events", func(done Done) { By("Creating the Manager") - cm, err := manager.New(cfg, manager.Options{}) + cm, err := manager.New(ctx, cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") diff --git a/pkg/manager/example_test.go b/pkg/manager/example_test.go index c24a2bc55d..5b1cbf98a5 100644 --- a/pkg/manager/example_test.go +++ b/pkg/manager/example_test.go @@ -17,6 +17,7 @@ limitations under the License. package manager_test import ( + "context" "os" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -40,7 +41,7 @@ func ExampleNew() { os.Exit(1) } - mgr, err := manager.New(cfg, manager.Options{}) + mgr, err := manager.New(context.Background(), cfg, manager.Options{}) if err != nil { log.Error(err, "unable to set up manager") os.Exit(1) @@ -56,7 +57,7 @@ func ExampleNew_multinamespaceCache() { os.Exit(1) } - mgr, err := manager.New(cfg, manager.Options{ + mgr, err := manager.New(context.Background(), cfg, manager.Options{ NewCache: cache.MultiNamespacedCacheBuilder([]string{"namespace1", "namespace2"}), }) if err != nil { @@ -68,7 +69,7 @@ func ExampleNew_multinamespaceCache() { // This example adds a Runnable for the Manager to Start. func ExampleManager_add() { - err := mgr.Add(manager.RunnableFunc(func(<-chan struct{}) error { + err := mgr.Add(manager.RunnableFunc(func(context.Context) error { // Do something return nil })) @@ -80,8 +81,7 @@ func ExampleManager_add() { // This example starts a Manager that has had Runnables added. func ExampleManager_start() { - err := mgr.Start(signals.SetupSignalHandler()) - if err != nil { + if err := mgr.Start(signals.SetupSignalHandler()); err != nil { log.Error(err, "unable start the manager") os.Exit(1) } diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 120a01fcd3..07f4af7799 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -128,16 +128,6 @@ type controllerManager struct { healthzStarted bool errChan chan error - // internalStop is the stop channel *actually* used by everything involved - // with the manager as a stop channel, so that we can pass a stop channel - // to things that need it off the bat (like the Channel source). It can - // be closed via `internalStopper` (by being the same underlying channel). - internalStop <-chan struct{} - - // internalStopper is the write side of the internal stop channel, allowing us to close it. - // It and `internalStop` should point to the same channel. - internalStopper chan<- struct{} - // Logger is the logger that should be used by this manager. // If none is set, it defaults to log.Log global logger. logger logr.Logger @@ -155,7 +145,7 @@ type controllerManager struct { // election was configured. elected chan struct{} - startCache func(stop <-chan struct{}) error + startCache func(ctx context.Context) error // port is the port that the webhook server serves at. port int @@ -194,6 +184,9 @@ type controllerManager struct { // after the gracefulShutdownTimeout ended. It must not be accessed before internalStop // is closed because it will be nil. shutdownCtx context.Context + + internalCtx context.Context + internalCancel context.CancelFunc } // Add sets dependencies on i, and adds it to the list of Runnables to start. @@ -247,7 +240,7 @@ func (cm *controllerManager) SetFields(i interface{}) error { if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } - if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil { + if _, err := inject.StopChannelInto(cm.internalCtx.Done(), i); err != nil { return err } if _, err := inject.MapperInto(cm.mapper, i); err != nil { @@ -385,7 +378,7 @@ func (cm *controllerManager) GetLogger() logr.Logger { return cm.logger } -func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { +func (cm *controllerManager) serveMetrics() { handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, }) @@ -406,7 +399,7 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { Handler: mux, } // Run the server - cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { + cm.startRunnable(RunnableFunc(func(_ context.Context) error { log.Info("starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { return err @@ -415,13 +408,13 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { })) // Shutdown the server when stop is closed - <-stop + <-cm.internalCtx.Done() if err := server.Shutdown(cm.shutdownCtx); err != nil { cm.errChan <- err } } -func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { +func (cm *controllerManager) serveHealthProbes() { // TODO(hypnoglow): refactor locking to use anonymous func in the similar way // it's done in serveMetrics. cm.mu.Lock() @@ -442,7 +435,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { Handler: mux, } // Run server - cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { + cm.startRunnable(RunnableFunc(func(_ context.Context) error { if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { return err } @@ -452,7 +445,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { cm.mu.Unlock() // Shutdown the server when stop is closed - <-stop + <-cm.internalCtx.Done() if err := server.Shutdown(cm.shutdownCtx); err != nil { cm.errChan <- err } @@ -462,7 +455,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) defer close(stopComplete) - // This must be deferred after closing stopComplete, otherwise we deadlock + // This must be deferred after closing stopComplete, otherwise we deadlock. defer func() { // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg stopErr := cm.engageStopProcedure(stopComplete) @@ -489,12 +482,12 @@ func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { // (If we don't serve metrics for non-leaders, prometheus will still scrape // the pod but will get a connection refused) if cm.metricsListener != nil { - go cm.serveMetrics(cm.internalStop) + go cm.serveMetrics() } // Serve health probes if cm.healthProbeListener != nil { - go cm.serveHealthProbes(cm.internalStop) + go cm.serveHealthProbes() } go cm.startNonLeaderElectionRunnables() @@ -524,15 +517,19 @@ func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { // engageStopProcedure signals all runnables to stop, reads potential errors // from the errChan and waits for them to end. It must not be called more than once. -func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error { - var cancel context.CancelFunc +func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error { + // Populate the shutdown context. + var shutdownCancel context.CancelFunc if cm.gracefulShutdownTimeout > 0 { - cm.shutdownCtx, cancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) + cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout) } else { - cm.shutdownCtx, cancel = context.WithCancel(context.Background()) + cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background()) } - defer cancel() - close(cm.internalStopper) + defer shutdownCancel() + + // Cancel the internal context and wait for the stop procedures to complete. + cm.internalCancel() + // Start draining the errors before acquiring the lock to make sure we don't deadlock // if something that has the lock is blocked on trying to write into the unbuffered // channel after something else already wrote into it. @@ -559,15 +556,12 @@ func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) err // want things like leader election to try and emit events on a closed // channel defer cm.recorderProvider.Stop(cm.shutdownCtx) - - return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel) + return cm.waitForRunnableToEnd(shutdownCancel) } // waitForRunnableToEnd blocks until all runnables ended or the // tearDownTimeout was reached. In the latter case, an error is returned. -func (cm *controllerManager) waitForRunnableToEnd(ctx context.Context, cancel context.CancelFunc) error { - defer cancel() - +func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelFunc) error { // Cancel leader election only after we waited. It will os.Exit() the app for safety. defer func() { if cm.leaderElectionCancel != nil { @@ -577,11 +571,11 @@ func (cm *controllerManager) waitForRunnableToEnd(ctx context.Context, cancel co go func() { cm.waitForRunnable.Wait() - cancel() + shutdownCancel() }() - <-ctx.Done() - if err := ctx.Err(); err != nil && err != context.Canceled { + <-cm.shutdownCtx.Done() + if err := cm.shutdownCtx.Err(); err != nil && err != context.Canceled { return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err) } return nil @@ -591,7 +585,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() - cm.waitForCache() + cm.waitForCache(cm.internalCtx) // Start the non-leaderelection Runnables after the cache has synced for _, c := range cm.nonLeaderElectionRunnables { @@ -605,7 +599,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { cm.mu.Lock() defer cm.mu.Unlock() - cm.waitForCache() + cm.waitForCache(cm.internalCtx) // Start the leader election Runnables after the cache has synced for _, c := range cm.leaderElectionRunnables { @@ -617,7 +611,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { cm.startedLeader = true } -func (cm *controllerManager) waitForCache() { +func (cm *controllerManager) waitForCache(ctx context.Context) { if cm.started { return } @@ -626,13 +620,13 @@ func (cm *controllerManager) waitForCache() { if cm.startCache == nil { cm.startCache = cm.cache.Start } - cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error { - return cm.startCache(stop) + cm.startRunnable(RunnableFunc(func(ctx context.Context) error { + return cm.startCache(ctx) })) // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(cm.internalStop) + cm.cache.WaitForCacheSync(ctx) // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse // cm.started as check if we already started the cache so it must always become true. // Making sure that the cache doesn't get started twice is needed to not get a "close @@ -688,7 +682,7 @@ func (cm *controllerManager) startRunnable(r Runnable) { cm.waitForRunnable.Add(1) go func() { defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalStop); err != nil { + if err := r.Start(cm.internalCtx); err != nil { cm.errChan <- err } }() diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 34c3609c77..90e34dffc1 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -17,6 +17,7 @@ limitations under the License. package manager import ( + "context" "fmt" "net" "net/http" @@ -271,19 +272,19 @@ type NewClientFunc func(cache cache.Cache, config *rest.Config, options client.O // it's done running. type Runnable interface { // Start starts running the component. The component will stop running - // when the channel is closed. Start blocks until the channel is closed or + // when the context is closed. Start blocks until the context is closed or // an error occurs. - Start(<-chan struct{}) error + Start(context.Context) error } // RunnableFunc implements Runnable using a function. // It's very important that the given function block // until it's done running. -type RunnableFunc func(<-chan struct{}) error +type RunnableFunc func(context.Context) error // Start implements Runnable -func (r RunnableFunc) Start(s <-chan struct{}) error { - return r(s) +func (r RunnableFunc) Start(ctx context.Context) error { + return r(ctx) } // LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode. @@ -294,7 +295,7 @@ type LeaderElectionRunnable interface { } // New returns a new Manager for creating Controllers. -func New(config *rest.Config, options Options) (Manager, error) { +func New(ctx context.Context, config *rest.Config, options Options) (Manager, error) { // Initialize a rest.config if none was specified if config == nil { return nil, fmt.Errorf("must specify Config") @@ -370,7 +371,7 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } - stop := make(chan struct{}) + internalCtx, internalCancel := context.WithCancel(ctx) return &controllerManager{ config: config, @@ -385,8 +386,6 @@ func New(config *rest.Config, options Options) (Manager, error) { metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, logger: options.Logger, - internalStop: stop, - internalStopper: stop, elected: make(chan struct{}), port: options.Port, host: options.Host, @@ -398,6 +397,8 @@ func New(config *rest.Config, options Options) (Manager, error) { readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, + internalCtx: internalCtx, + internalCancel: internalCancel, }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 8a3fdabc76..0b0efee3d3 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -25,6 +25,7 @@ import ( "net/http" "path" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -63,7 +64,7 @@ var _ = Describe("manger.Manager", func() { Describe("New", func() { It("should return an error if there is no Config", func() { - m, err := New(nil, Options{}) + m, err := New(context.Background(), nil, Options{}) Expect(m).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) @@ -71,7 +72,7 @@ var _ = Describe("manger.Manager", func() { It("should return an error if it can't create a RestMapper", func() { expected := fmt.Errorf("expected error: RestMapper") - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ MapperProvider: func(c *rest.Config) (meta.RESTMapper, error) { return nil, expected }, }) Expect(m).To(BeNil()) @@ -80,7 +81,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error it can't create a client.Client", func(done Done) { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { return nil, fmt.Errorf("expected error") }, @@ -93,7 +94,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error it can't create a cache.Cache", func(done Done) { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { return nil, fmt.Errorf("expected error") }, @@ -106,7 +107,7 @@ var _ = Describe("manger.Manager", func() { }) It("should create a client defined in by the new client function", func(done Done) { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { return nil, nil }, @@ -119,7 +120,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error it can't create a recorder.Provider", func(done Done) { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ newRecorderProvider: func(_ *rest.Config, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { return nil, fmt.Errorf("expected error") }, @@ -133,7 +134,7 @@ var _ = Describe("manger.Manager", func() { It("should lazily initialize a webhook server if needed", func(done Done) { By("creating a manager with options") - m, err := New(cfg, Options{Port: 9440, Host: "foo.com"}) + m, err := New(context.Background(), cfg, Options{Port: 9440, Host: "foo.com"}) Expect(err).NotTo(HaveOccurred()) Expect(m).NotTo(BeNil()) @@ -148,7 +149,7 @@ var _ = Describe("manger.Manager", func() { Context("with leader election enabled", func() { It("should only cancel the leader election after all runnables are done", func() { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id-2", @@ -158,8 +159,8 @@ var _ = Describe("manger.Manager", func() { Expect(err).To(BeNil()) runnableDone := make(chan struct{}) - slowRunnable := RunnableFunc(func(s <-chan struct{}) error { - <-s + slowRunnable := RunnableFunc(func(ctx context.Context) error { + <-ctx.Done() time.Sleep(100 * time.Millisecond) close(runnableDone) return nil @@ -193,7 +194,7 @@ var _ = Describe("manger.Manager", func() { }) It("should disable gracefulShutdown when stopping to lead", func() { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id-3", @@ -220,7 +221,7 @@ var _ = Describe("manger.Manager", func() { }) It("should default ID to controller-runtime if ID is not set", func() { var rl resourcelock.Interface - m1, err := New(cfg, Options{ + m1, err := New(context.Background(), cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id", @@ -240,7 +241,7 @@ var _ = Describe("manger.Manager", func() { Expect(ok).To(BeTrue()) m1cm.onStoppedLeading = func() {} - m2, err := New(cfg, Options{ + m2, err := New(context.Background(), cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id", @@ -261,22 +262,24 @@ var _ = Describe("manger.Manager", func() { m2cm.onStoppedLeading = func() {} c1 := make(chan struct{}) - Expect(m1.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m1.Add(RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() close(c1) return nil }))).To(Succeed()) + m1Stop := make(chan struct{}) + defer close(m1Stop) go func() { defer GinkgoRecover() Expect(m1.Elected()).ShouldNot(BeClosed()) - Expect(m1.Start(stop)).NotTo(HaveOccurred()) + Expect(m1.Start(m1Stop)).NotTo(HaveOccurred()) Expect(m1.Elected()).Should(BeClosed()) }() <-c1 c2 := make(chan struct{}) - Expect(m2.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m2.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() close(c2) return nil @@ -297,7 +300,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error if it can't create a ResourceLock", func() { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ newResourceLock: func(_ *rest.Config, _ recorder.Provider, _ leaderelection.Options) (resourcelock.Interface, error) { return nil, fmt.Errorf("expected error") }, @@ -306,7 +309,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).To(MatchError(ContainSubstring("expected error"))) }) It("should return an error if namespace not set and not running in cluster", func() { - m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) + m, err := New(context.Background(), cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("unable to find leader election namespace: not running in-cluster, please specify LeaderElectionNamespace")) @@ -316,7 +319,7 @@ var _ = Describe("manger.Manager", func() { // ConfigMap lock to a controller-runtime version that has this new default. Many users of controller-runtime skip // versions, so we should be extremely conservative here. It("should default to ConfigMapsLeasesResourceLock", func() { - m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "my-ns"}) + m, err := New(context.Background(), cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "my-ns"}) Expect(m).ToNot(BeNil()) Expect(err).ToNot(HaveOccurred()) cm, ok := m.(*controllerManager) @@ -330,7 +333,7 @@ var _ = Describe("manger.Manager", func() { }) It("should use the specified ResourceLock", func() { - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ LeaderElection: true, LeaderElectionResourceLock: resourcelock.LeasesResourceLock, LeaderElectionID: "controller-runtime", @@ -347,7 +350,7 @@ var _ = Describe("manger.Manager", func() { It("should create a listener for the metrics if a valid address is provided", func() { var listener net.Listener - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ MetricsBindAddress: ":0", newMetricsListener: func(addr string) (net.Listener, error) { var err error @@ -366,7 +369,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).ShouldNot(HaveOccurred()) var listener net.Listener - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ MetricsBindAddress: ln.Addr().String(), newMetricsListener: func(addr string) (net.Listener, error) { var err error @@ -383,7 +386,7 @@ var _ = Describe("manger.Manager", func() { It("should create a listener for the health probes if a valid address is provided", func() { var listener net.Listener - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ HealthProbeBindAddress: ":0", newHealthProbeListener: func(addr string) (net.Listener, error) { var err error @@ -402,7 +405,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).ShouldNot(HaveOccurred()) var listener net.Listener - m, err := New(cfg, Options{ + m, err := New(context.Background(), cfg, Options{ HealthProbeBindAddress: ln.Addr().String(), newHealthProbeListener: func(addr string) (net.Listener, error) { var err error @@ -421,20 +424,20 @@ var _ = Describe("manger.Manager", func() { Describe("Start", func() { var startSuite = func(options Options, callbacks ...func(Manager)) { It("should Start each Component", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } var wgRunnableStarted sync.WaitGroup wgRunnableStarted.Add(2) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() wgRunnableStarted.Done() return nil }))).To(Succeed()) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() wgRunnableStarted.Done() return nil @@ -452,7 +455,7 @@ var _ = Describe("manger.Manager", func() { }) It("should stop when stop is called", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -465,14 +468,14 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error if it can't start the cache", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.startCache = func(stop <-chan struct{}) error { + mgr.startCache = func(context.Context) error { return fmt.Errorf("expected error") } Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error"))) @@ -481,24 +484,24 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error if any Components fail to Start", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() - <-s + <-ctx.Done() return nil }))).To(Succeed()) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() return fmt.Errorf("expected error") }))).To(Succeed()) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() return nil }))).To(Succeed()) @@ -512,34 +515,34 @@ var _ = Describe("manger.Manager", func() { }) It("should wait for runnables to stop", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } var lock sync.Mutex - runnableDoneCount := 0 + var runnableDoneCount int64 runnableDoneFunc := func() { lock.Lock() defer lock.Unlock() - runnableDoneCount++ + atomic.AddInt64(&runnableDoneCount, 1) } var wgRunnableRunning sync.WaitGroup wgRunnableRunning.Add(2) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { wgRunnableRunning.Done() defer GinkgoRecover() defer runnableDoneFunc() - <-s + <-ctx.Done() return nil }))).To(Succeed()) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { wgRunnableRunning.Done() defer GinkgoRecover() defer runnableDoneFunc() - <-s + <-ctx.Done() time.Sleep(300 * time.Millisecond) //slow closure simulation return nil }))).To(Succeed()) @@ -550,9 +553,12 @@ var _ = Describe("manger.Manager", func() { var wgManagerRunning sync.WaitGroup wgManagerRunning.Add(1) go func() { + defer GinkgoRecover() defer wgManagerRunning.Done() Expect(m.Start(s)).NotTo(HaveOccurred()) - Expect(runnableDoneCount).To(Equal(2)) + Eventually(func() int64 { + return atomic.LoadInt64(&runnableDoneCount) + }).Should(BeEquivalentTo(2)) }() wgRunnableRunning.Wait() close(s) @@ -562,7 +568,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error if any Components fail to Start and wait for runnables to stop", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -576,16 +582,16 @@ var _ = Describe("manger.Manager", func() { runnableDoneCount++ } - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() defer runnableDoneFunc() return fmt.Errorf("expected error") }))).To(Succeed()) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() defer runnableDoneFunc() - <-s + <-ctx.Done() return nil }))).To(Succeed()) @@ -596,7 +602,7 @@ var _ = Describe("manger.Manager", func() { }) It("should refuse to add runnable if stop procedure is already engaged", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -605,10 +611,10 @@ var _ = Describe("manger.Manager", func() { var wgRunnableRunning sync.WaitGroup wgRunnableRunning.Add(1) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { wgRunnableRunning.Done() defer GinkgoRecover() - <-s + <-ctx.Done() return nil }))).To(Succeed()) @@ -619,7 +625,7 @@ var _ = Describe("manger.Manager", func() { wgRunnableRunning.Wait() close(s) time.Sleep(100 * time.Millisecond) // give some time for the stop chan closure to be caught by the manager - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() return nil }))).NotTo(Succeed()) @@ -628,19 +634,19 @@ var _ = Describe("manger.Manager", func() { }) It("should return both runnables and stop errors when both error", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond - Expect(m.Add(RunnableFunc(func(_ <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { return runnableError{} }))) testDone := make(chan struct{}) defer close(testDone) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { - <-s + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + <-ctx.Done() timer := time.NewTimer(30 * time.Second) defer timer.Stop() select { @@ -661,20 +667,20 @@ var _ = Describe("manger.Manager", func() { }) It("should return only stop errors if runnables dont error", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { - <-s + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + <-ctx.Done() return nil }))) testDone := make(chan struct{}) defer close(testDone) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { - <-s + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + <-ctx.Done() timer := time.NewTimer(30 * time.Second) defer timer.Stop() select { @@ -701,12 +707,12 @@ var _ = Describe("manger.Manager", func() { }) It("should return only runnables error if stop doesn't error", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } - Expect(m.Add(RunnableFunc(func(_ <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { return runnableError{} }))) err = m.Start(make(chan struct{})) @@ -719,7 +725,7 @@ var _ = Describe("manger.Manager", func() { }) It("should not wait for runnables if gracefulShutdownTimeout is 0", func(done Done) { - m, err := New(cfg, options) + m, err := New(context.Background(), cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -727,8 +733,8 @@ var _ = Describe("manger.Manager", func() { m.(*controllerManager).gracefulShutdownTimeout = time.Duration(0) runnableStopped := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { - <-s + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + <-ctx.Done() time.Sleep(100 * time.Millisecond) close(runnableStopped) return nil @@ -793,7 +799,7 @@ var _ = Describe("manger.Manager", func() { It("should stop serving metrics when stop is called", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) s := make(chan struct{}) @@ -820,7 +826,7 @@ var _ = Describe("manger.Manager", func() { It("should serve metrics endpoint", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) s := make(chan struct{}) @@ -839,7 +845,7 @@ var _ = Describe("manger.Manager", func() { It("should not serve anything other than metrics endpoint by default", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) s := make(chan struct{}) @@ -866,7 +872,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) opts.MetricsBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) s := make(chan struct{}) @@ -897,7 +903,7 @@ var _ = Describe("manger.Manager", func() { It("should serve extra endpoints", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) err = m.AddMetricsExtraHandler("/debug", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -954,7 +960,7 @@ var _ = Describe("manger.Manager", func() { It("should stop serving health probes when stop is called", func(done Done) { opts.HealthProbeBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) s := make(chan struct{}) @@ -981,7 +987,7 @@ var _ = Describe("manger.Manager", func() { It("should serve readiness endpoint", func(done Done) { opts.HealthProbeBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) res := fmt.Errorf("not ready yet") @@ -1032,7 +1038,7 @@ var _ = Describe("manger.Manager", func() { It("should serve liveness endpoint", func(done Done) { opts.HealthProbeBindAddress = ":0" - m, err := New(cfg, opts) + m, err := New(context.Background(), cfg, opts) Expect(err).NotTo(HaveOccurred()) res := fmt.Errorf("not alive") @@ -1085,14 +1091,14 @@ var _ = Describe("manger.Manager", func() { Describe("Add", func() { It("should immediately start the Component if the Manager has already Started another Component", func(done Done) { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) // Add one component before starting c1 := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() close(c1) return nil @@ -1112,7 +1118,7 @@ var _ = Describe("manger.Manager", func() { // Add another component after starting c2 := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() close(c2) return nil @@ -1124,7 +1130,7 @@ var _ = Describe("manger.Manager", func() { }) It("should immediately start the Component if the Manager has already Started", func(done Done) { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1142,7 +1148,7 @@ var _ = Describe("manger.Manager", func() { }).Should(BeTrue()) c1 := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() close(c1) return nil @@ -1153,14 +1159,14 @@ var _ = Describe("manger.Manager", func() { }) It("should fail if SetFields fails", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.Add(&failRec{})).To(HaveOccurred()) }) }) Describe("SetFields", func() { It("should inject field values", func(done Done) { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1244,6 +1250,7 @@ var _ = Describe("manger.Manager", func() { }, }) Expect(err).To(Equal(expected)) + err = m.SetFields(&injectable{ stop: func(<-chan struct{}) error { return expected @@ -1257,7 +1264,7 @@ var _ = Describe("manger.Manager", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) s := make(chan struct{}) @@ -1273,7 +1280,7 @@ var _ = Describe("manger.Manager", func() { It("should not leak goroutines if the default event broadcaster is used & events are emitted", func() { currentGRs := goleak.IgnoreCurrent() - m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) + m, err := New(context.Background(), cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) Expect(err).NotTo(HaveOccurred()) By("adding a runnable that emits an event") @@ -1281,7 +1288,7 @@ var _ = Describe("manger.Manager", func() { ns.Name = "default" recorder := m.GetEventRecorderFor("rock-and-roll") - Expect(m.Add(RunnableFunc(func(_ <-chan struct{}) error { + Expect(m.Add(RunnableFunc(func(_ context.Context) error { recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah") return nil }))).To(Succeed()) @@ -1317,7 +1324,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the Config", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1325,7 +1332,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the Client", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1333,7 +1340,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the Scheme", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1341,7 +1348,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the FieldIndexer", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1349,12 +1356,12 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the EventRecorder", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { - m, err := New(cfg, Options{}) + m, err := New(context.Background(), cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.GetAPIReader()).NotTo(BeNil()) }) @@ -1369,7 +1376,7 @@ func (*failRec) Reconcile(context.Context, reconcile.Request) (reconcile.Result, return reconcile.Result{}, nil } -func (*failRec) Start(<-chan struct{}) error { +func (*failRec) Start(context.Context) error { return nil } diff --git a/pkg/scheme/scheme.go b/pkg/scheme/scheme.go index 18a59fdafe..9dc93a9b21 100644 --- a/pkg/scheme/scheme.go +++ b/pkg/scheme/scheme.go @@ -46,7 +46,7 @@ limitations under the License. // } // // func main() { -// mgr := controllers.NewManager(controllers.GetConfigOrDie(), manager.Options{ +// mgr := controllers.NewManager(context.Background(), controllers.GetConfigOrDie(), manager.Options{ // Scheme: scheme, // }) // // ... diff --git a/pkg/source/source.go b/pkg/source/source.go index d670b8ce05..5eff2177b5 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -54,14 +54,14 @@ const ( type Source interface { // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. - Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error + Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error } // SyncingSource is a source that needs syncing prior to being usable. The controller // will call its WaitForSync prior to starting workers. type SyncingSource interface { Source - WaitForSync(stop <-chan struct{}) error + WaitForSync(ctx context.Context) error } // NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used @@ -75,13 +75,13 @@ type kindWithCache struct { kind Kind } -func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface, +func (ks *kindWithCache) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { - return ks.kind.Start(handler, queue, prct...) + return ks.kind.Start(ctx, handler, queue, prct...) } -func (ks *kindWithCache) WaitForSync(stop <-chan struct{}) error { - return ks.kind.WaitForSync(stop) +func (ks *kindWithCache) WaitForSync(ctx context.Context) error { + return ks.kind.WaitForSync(ctx) } // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create) @@ -97,7 +97,7 @@ var _ SyncingSource = &Kind{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface, +func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { // Type should have been specified by the user. @@ -111,7 +111,7 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting } // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(context.TODO(), ks.Type) + i, err := ks.cache.GetInformer(ctx, ks.Type) if err != nil { if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { log.Error(err, "if kind is a CRD, it should be installed before calling Start", @@ -132,8 +132,8 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. -func (ks *Kind) WaitForSync(stop <-chan struct{}) error { - if !ks.cache.WaitForCacheSync(stop) { +func (ks *Kind) WaitForSync(ctx context.Context) error { + if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here return errors.New("cache did not sync") } @@ -195,6 +195,7 @@ func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error { // Start implements Source and should only be called by the Controller. func (cs *Channel) Start( + ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { @@ -218,7 +219,7 @@ func (cs *Channel) Start( cs.once.Do(func() { // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source - go cs.syncLoop() + go cs.syncLoop(ctx) }) go func() { @@ -266,10 +267,10 @@ func (cs *Channel) distribute(evt event.GenericEvent) { } } -func (cs *Channel) syncLoop() { +func (cs *Channel) syncLoop(ctx context.Context) { for { select { - case <-cs.stop: + case <-ctx.Done(): // Close destination channels cs.doStop() return @@ -289,7 +290,7 @@ var _ Source = &Informer{} // Start is internal and should be called only by the Controller to register an EventHandler with the Informer // to enqueue reconcile.Requests. -func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface, +func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { // Informer should have been specified by the user. @@ -308,12 +309,12 @@ func (is *Informer) String() string { var _ Source = Func(nil) // Func is a function that implements Source -type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error +type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error // Start implements Source -func (f Func) Start(evt handler.EventHandler, queue workqueue.RateLimitingInterface, +func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface, pr ...predicate.Predicate) error { - return f(evt, queue, pr...) + return f(ctx, evt, queue, pr...) } func (f Func) String() string { diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 9d5d74f86b..2a19729097 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -17,7 +17,6 @@ limitations under the License. package source_test import ( - "context" "fmt" "time" @@ -44,7 +43,6 @@ var _ = Describe("Source", func() { var c1, c2 chan interface{} var ns string count := 0 - ctx := context.TODO() BeforeEach(func(done Done) { // Create the namespace for the test @@ -133,8 +131,8 @@ var _ = Describe("Source", func() { handler2 := newHandler(c2) // Create 2 instances - Expect(instance1.Start(handler1, q)).To(Succeed()) - Expect(instance2.Start(handler2, q)).To(Succeed()) + Expect(instance1.Start(ctx, handler1, q)).To(Succeed()) + Expect(instance2.Start(ctx, handler2, q)).To(Succeed()) By("Creating a Deployment and expecting the CreateEvent.") created, err = client.Create(ctx, deployment, metav1.CreateOptions{}) @@ -255,7 +253,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Informer{Informer: depInformer} - err := instance.Start(handler.Funcs{ + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() var err error @@ -297,7 +295,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Informer{Informer: depInformer} - err = instance.Start(handler.Funcs{ + err = instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { }, UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { @@ -335,7 +333,7 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Informer{Informer: depInformer} - err := instance.Start(handler.Funcs{ + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { }, UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) { diff --git a/pkg/source/source_suite_test.go b/pkg/source/source_suite_test.go index ae1c9d5f9c..7be654bf0a 100644 --- a/pkg/source/source_suite_test.go +++ b/pkg/source/source_suite_test.go @@ -17,6 +17,7 @@ limitations under the License. package source_test import ( + "context" "testing" . "github.com/onsi/ginkgo" @@ -40,10 +41,11 @@ var testenv *envtest.Environment var config *rest.Config var clientset *kubernetes.Clientset var icache cache.Cache -var stop chan struct{} +var ctx context.Context +var cancel context.CancelFunc var _ = BeforeSuite(func(done Done) { - stop = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) testenv = &envtest.Environment{} @@ -60,14 +62,14 @@ var _ = BeforeSuite(func(done Done) { go func() { defer GinkgoRecover() - Expect(icache.Start(stop)).NotTo(HaveOccurred()) + Expect(icache.Start(ctx)).NotTo(HaveOccurred()) }() close(done) }, 60) var _ = AfterSuite(func(done Done) { - close(stop) + cancel() Expect(testenv.Stop()).To(Succeed()) close(done) diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 742eae196e..1a0c6146d6 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -17,6 +17,7 @@ limitations under the License. package source_test import ( + "context" "fmt" . "github.com/onsi/ginkgo" @@ -68,7 +69,7 @@ var _ = Describe("Source", func() { Type: &corev1.Pod{}, } Expect(inject.CacheInto(ic, instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{ + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Expect(q2).To(Equal(q)) @@ -108,7 +109,7 @@ var _ = Describe("Source", func() { Type: &corev1.Pod{}, } Expect(instance.InjectCache(ic)).To(Succeed()) - err := instance.Start(handler.Funcs{ + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -156,7 +157,7 @@ var _ = Describe("Source", func() { Type: &corev1.Pod{}, } Expect(inject.CacheInto(ic, instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{ + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected DeleteEvent") @@ -189,7 +190,7 @@ var _ = Describe("Source", func() { It("should return an error from Start if informers were not injected", func(done Done) { instance := source.Kind{Type: &corev1.Pod{}} - err := instance.Start(nil, nil) + err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("must call CacheInto on Kind before calling Start")) @@ -199,7 +200,7 @@ var _ = Describe("Source", func() { It("should return an error from Start if a type was not provided", func(done Done) { instance := source.Kind{} Expect(instance.InjectCache(&informertest.FakeInformers{})).To(Succeed()) - err := instance.Start(nil, nil) + err := instance.Start(ctx, nil, nil) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("must specify Kind.Type")) @@ -227,7 +228,7 @@ var _ = Describe("Source", func() { Type: &corev1.Pod{}, } Expect(instance.InjectCache(ic)).To(Succeed()) - err := instance.Start(handler.Funcs{}, q) + err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).To(HaveOccurred()) close(done) @@ -259,37 +260,40 @@ var _ = Describe("Source", func() { It("should be called from Start", func(done Done) { run := false instance := source.Func(func( + context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error { run = true return nil }) - Expect(instance.Start(nil, nil)).NotTo(HaveOccurred()) + Expect(instance.Start(ctx, nil, nil)).NotTo(HaveOccurred()) Expect(run).To(BeTrue()) expected := fmt.Errorf("expected error: Func") instance = source.Func(func( + context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error { return expected }) - Expect(instance.Start(nil, nil)).To(Equal(expected)) + Expect(instance.Start(ctx, nil, nil)).To(Equal(expected)) close(done) }) }) Describe("Channel", func() { - var stop chan struct{} + var ctx context.Context + var cancel context.CancelFunc var ch chan event.GenericEvent BeforeEach(func() { - stop = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) ch = make(chan event.GenericEvent) }) AfterEach(func() { - close(stop) + cancel() close(ch) }) @@ -315,8 +319,8 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(stop, instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{ + Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -356,8 +360,8 @@ var _ = Describe("Source", func() { // Add a handler to get distribution blocked instance := &source.Channel{Source: ch} instance.DestBufferSize = 1 - Expect(inject.StopChannelInto(stop, instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{ + Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -415,9 +419,9 @@ var _ = Describe("Source", func() { // Add a handler to get distribution blocked instance := &source.Channel{Source: ch} instance.DestBufferSize = 1 - Expect(inject.StopChannelInto(stop, instance)).To(BeTrue()) + Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{ + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -445,15 +449,15 @@ var _ = Describe("Source", func() { It("should get error if no source specified", func(done Done) { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{ /*no source specified*/ } - Expect(inject.StopChannelInto(stop, instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{}, q) + Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source"))) close(done) }) It("should get error if no stop channel injected", func(done Done) { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} - err := instance.Start(handler.Funcs{}, q) + err := instance.Start(ctx, handler.Funcs{}, q) Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start"))) close(done) }) @@ -475,8 +479,8 @@ var _ = Describe("Source", func() { q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") instance := &source.Channel{Source: ch} - Expect(inject.StopChannelInto(stop, instance)).To(BeTrue()) - err := instance.Start(handler.Funcs{ + Expect(inject.StopChannelInto(ctx.Done(), instance)).To(BeTrue()) + err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") @@ -499,7 +503,7 @@ var _ = Describe("Source", func() { }, q) Expect(err).NotTo(HaveOccurred()) - err = instance.Start(handler.Funcs{ + err = instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { defer GinkgoRecover() Fail("Unexpected CreateEvent") diff --git a/pkg/webhook/conversion/testdata/main.go b/pkg/webhook/conversion/testdata/main.go index 70ac01c9d8..a3922da009 100644 --- a/pkg/webhook/conversion/testdata/main.go +++ b/pkg/webhook/conversion/testdata/main.go @@ -16,6 +16,7 @@ limitations under the License. package main import ( + "context" "flag" "os" @@ -53,7 +54,7 @@ func main() { ctrl.SetLogger(zap.Logger(true)) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + mgr, err := ctrl.NewManager(context.Background(), ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, LeaderElection: enableLeaderElection, diff --git a/pkg/webhook/example_test.go b/pkg/webhook/example_test.go index b225fea89b..e27b03c2cb 100644 --- a/pkg/webhook/example_test.go +++ b/pkg/webhook/example_test.go @@ -46,7 +46,7 @@ func Example() { // Create a manager // Note: GetConfigOrDie will os.Exit(1) w/o any message if no kube-config can be found - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) + mgr, err := ctrl.NewManager(context.Background(), ctrl.GetConfigOrDie(), ctrl.Options{}) if err != nil { panic(err) } diff --git a/pkg/webhook/internal/certwatcher/certwatcher.go b/pkg/webhook/internal/certwatcher/certwatcher.go index bd797fd738..d681ef2a6b 100644 --- a/pkg/webhook/internal/certwatcher/certwatcher.go +++ b/pkg/webhook/internal/certwatcher/certwatcher.go @@ -17,6 +17,7 @@ limitations under the License. package certwatcher import ( + "context" "crypto/tls" "sync" @@ -69,7 +70,7 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, } // Start starts the watch on the certificate and key files. -func (cw *CertWatcher) Start(stopCh <-chan struct{}) error { +func (cw *CertWatcher) Start(ctx context.Context) error { files := []string{cw.certPath, cw.keyPath} for _, f := range files { @@ -82,8 +83,8 @@ func (cw *CertWatcher) Start(stopCh <-chan struct{}) error { log.Info("Starting certificate watcher") - // Block until the stop channel is closed. - <-stopCh + // Block until the context is done. + <-ctx.Done() return cw.watcher.Close() } diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 2b91580c6a..721df490a0 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -172,7 +172,7 @@ func instrumentedHook(path string, hookRaw http.Handler) http.Handler { // Start runs the server. // It will install the webhook related resources depend on the server configuration. -func (s *Server) Start(stop <-chan struct{}) error { +func (s *Server) Start(ctx context.Context) error { s.defaultingOnce.Do(s.setDefaults) baseHookLog := log.WithName("webhooks") @@ -187,7 +187,7 @@ func (s *Server) Start(stop <-chan struct{}) error { } go func() { - if err := certWatcher.Start(stop); err != nil { + if err := certWatcher.Start(ctx); err != nil { log.Error(err, "certificate watcher error") } }() @@ -227,7 +227,7 @@ func (s *Server) Start(stop <-chan struct{}) error { idleConnsClosed := make(chan struct{}) go func() { - <-stop + <-ctx.Done() log.Info("shutting down webhook server") // TODO: use a context with reasonable timeout @@ -238,8 +238,7 @@ func (s *Server) Start(stop <-chan struct{}) error { close(idleConnsClosed) }() - err = srv.Serve(listener) - if err != nil && err != http.ErrServerClosed { + if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed { return err } diff --git a/pkg/webhook/server_test.go b/pkg/webhook/server_test.go index 56ddbbcc75..24c797cab8 100644 --- a/pkg/webhook/server_test.go +++ b/pkg/webhook/server_test.go @@ -17,6 +17,7 @@ limitations under the License. package webhook_test import ( + "context" "fmt" "io/ioutil" "net" @@ -31,14 +32,15 @@ import ( var _ = Describe("Webhook Server", func() { var ( - stop chan struct{} + ctx context.Context + ctxCancel context.CancelFunc testHostPort string client *http.Client server *webhook.Server ) BeforeEach(func() { - stop = make(chan struct{}) + ctx, ctxCancel = context.WithCancel(context.Background()) // closed in indivual tests differently servingOpts := envtest.WebhookInstallOptions{} @@ -69,7 +71,7 @@ var _ = Describe("Webhook Server", func() { go func() { defer GinkgoRecover() defer close(doneCh) - Expect(server.Start(stop)).To(Succeed()) + Expect(server.Start(ctx)).To(Succeed()) }() // wait till we can ping the server to start the test Eventually(func() error { @@ -109,7 +111,7 @@ var _ = Describe("Webhook Server", func() { Expect(func() { server.Register("/somepath", &testHandler{}) }).To(Panic()) - close(stop) + ctxCancel() Eventually(doneCh, "4s").Should(BeClosed()) }) @@ -126,7 +128,7 @@ var _ = Describe("Webhook Server", func() { return ioutil.ReadAll(resp.Body) }).Should(Equal([]byte("gadzooks!"))) - close(stop) + ctxCancel() Eventually(doneCh, "4s").Should(BeClosed()) }) @@ -137,7 +139,7 @@ var _ = Describe("Webhook Server", func() { Eventually(func() bool { return handler.injectedField }).Should(BeTrue()) - close(stop) + ctxCancel() Eventually(doneCh, "4s").Should(BeClosed()) }) }) @@ -151,7 +153,7 @@ var _ = Describe("Webhook Server", func() { }) AfterEach(func() { // wait for cleanup to happen - close(stop) + ctxCancel() Eventually(doneCh, "4s").Should(BeClosed()) })