diff --git a/util/unstructured/unstructured.go b/util/unstructured/unstructured.go index 7cc72e64a8c3..698bff0372a6 100644 --- a/util/unstructured/unstructured.go +++ b/util/unstructured/unstructured.go @@ -18,25 +18,25 @@ import ( // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { - return NewFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers, nil) + return NewFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers, nil, nil) } // NewFilteredUnstructuredInformer constructs a new informer for Unstructured type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. -func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { +func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListRequestListOptions internalinterfaces.TweakListOptionsFunc, tweakWatchRequestListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { ctx := context.Background() return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - if tweakListOptions != nil { - tweakListOptions(&options) + if tweakListRequestListOptions != nil { + tweakListRequestListOptions(&options) } return client.Resource(resource).Namespace(namespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - if tweakListOptions != nil { - tweakListOptions(&options) + if tweakWatchRequestListOptions != nil { + tweakWatchRequestListOptions(&options) } return client.Resource(resource).Namespace(namespace).Watch(ctx, options) }, diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 7dfc32719b03..a943471bb3ac 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -266,7 +266,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo WithField("podCleanup", podCleanupWorkers). Info("Current Worker Numbers") - wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers) + wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace) wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer() wfc.artGCTaskInformer = wfc.newArtGCTaskInformer() @@ -817,7 +817,7 @@ func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error { return nil } -func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) { +func (wfc *WorkflowController) tweakListRequestListOptions(options *metav1.ListOptions) { labelSelector := labels.NewSelector(). Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) options.LabelSelector = labelSelector.String() @@ -826,6 +826,12 @@ func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) { options.ResourceVersion = "" } +func (wfc *WorkflowController) tweakWatchRequestListOptions(options *metav1.ListOptions) { + labelSelector := labels.NewSelector(). + Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) + options.LabelSelector = labelSelector.String() +} + func getWfPriority(obj interface{}) (int32, time.Time) { un, ok := obj.(*unstructured.Unstructured) if !ok { diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index a89356c821b6..94942da6f9e2 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -259,7 +259,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl // always compare to WorkflowController.Run to see what this block of code should be doing { - wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListOptions, indexers) + wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers) wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets() wfc.artGCTaskInformer = informerFactory.Argoproj().V1alpha1().WorkflowArtifactGCTasks() wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer() diff --git a/workflow/cron/controller.go b/workflow/cron/controller.go index b9486a1d23bd..a041ad2792fe 100644 --- a/workflow/cron/controller.go +++ b/workflow/cron/controller.go @@ -103,9 +103,10 @@ func (cc *Controller) Run(ctx context.Context) { }).ForResource(schema.GroupVersionResource{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural}) cc.addCronWorkflowInformerHandler() - wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, func(options *v1.ListOptions) { - wfInformerListOptionsFunc(options, cc.instanceId) - }, cache.Indexers{}) + wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, + func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceId) }, + func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceId) }, + cache.Indexers{}) go wfInformer.Run(ctx.Done()) cc.wfLister = util.NewWorkflowLister(wfInformer) diff --git a/workflow/util/util.go b/workflow/util/util.go index 3083c4f63ed5..bb539f8ee8c8 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -59,7 +59,7 @@ import ( // objects. We no longer return WorkflowInformer due to: // https://github.com/kubernetes/kubernetes/issues/57705 // https://github.com/argoproj/argo-workflows/issues/632 -func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer { +func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListRequestListOptions internalinterfaces.TweakListOptionsFunc, tweakWatchRequestListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer { resource := schema.GroupVersionResource{ Group: workflow.Group, Version: "v1alpha1", @@ -71,7 +71,8 @@ func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time ns, resyncPeriod, indexers, - tweakListOptions, + tweakListRequestListOptions, + tweakWatchRequestListOptions, ) return informer }