@@ -130,6 +130,10 @@ func (c *Controller) Start(stop <-chan struct{}) error {
130130 c .Queue = c .MakeQueue ()
131131 defer c .Queue .ShutDown () // needs to be outside the iife so that we shutdown after the stop channel is closed
132132
133+ // TODO: Propagate context from the Runnable interface, when we're ready to change the signature.
134+ ctx , cancel := context .WithCancel (context .TODO ())
135+ defer cancel ()
136+
133137 err := func () error {
134138 defer c .mu .Unlock ()
135139
@@ -170,8 +174,12 @@ func (c *Controller) Start(stop <-chan struct{}) error {
170174 // Launch workers to process resources
171175 c .Log .Info ("Starting workers" , "worker count" , c .MaxConcurrentReconciles )
172176 for i := 0 ; i < c .MaxConcurrentReconciles ; i ++ {
173- // Process work items
174- go wait .Until (c .worker , c .JitterPeriod , stop )
177+ // Run a worker thread that just dequeues items, processes them, and marks them done.
178+ // It enforces that the reconcileHandler is never invoked concurrently with the same object.
179+ go wait .Until (func () {
180+ for c .processNextWorkItem (ctx ) {
181+ }
182+ }, c .JitterPeriod , stop )
175183 }
176184
177185 c .Started = true
@@ -180,22 +188,16 @@ func (c *Controller) Start(stop <-chan struct{}) error {
180188 if err != nil {
181189 return err
182190 }
191+ context .WithCancel (context .Background ())
183192
184193 <- stop
185194 c .Log .Info ("Stopping workers" )
186195 return nil
187196}
188197
189- // worker runs a worker thread that just dequeues items, processes them, and marks them done.
190- // It enforces that the reconcileHandler is never invoked concurrently with the same object.
191- func (c * Controller ) worker () {
192- for c .processNextWorkItem () {
193- }
194- }
195-
196198// processNextWorkItem will read a single work item off the workqueue and
197199// attempt to process it, by calling the reconcileHandler.
198- func (c * Controller ) processNextWorkItem () bool {
200+ func (c * Controller ) processNextWorkItem (ctx context. Context ) bool {
199201 obj , shutdown := c .Queue .Get ()
200202 if shutdown {
201203 // Stop working
@@ -210,10 +212,10 @@ func (c *Controller) processNextWorkItem() bool {
210212 // period.
211213 defer c .Queue .Done (obj )
212214
213- return c .reconcileHandler (obj )
215+ return c .reconcileHandler (ctx , obj )
214216}
215217
216- func (c * Controller ) reconcileHandler (obj interface {}) bool {
218+ func (c * Controller ) reconcileHandler (ctx context. Context , obj interface {}) bool {
217219 // Update metrics after processing each item
218220 reconcileStartTS := time .Now ()
219221 defer func () {
@@ -233,7 +235,7 @@ func (c *Controller) reconcileHandler(obj interface{}) bool {
233235 }
234236
235237 log := c .Log .WithValues ("name" , req .Name , "namespace" , req .Namespace )
236- ctx : = logf .IntoContext (context . Background () , log )
238+ ctx = logf .IntoContext (ctx , log )
237239
238240 // RunInformersAndControllers the syncHandler, passing it the namespace/Name string of the
239241 // resource to be synced.
0 commit comments