Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Resource version incorrectly overridden for wfInformer list requests. Fixes #11948 #12133

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions util/unstructured/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ import (
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers, nil)
return NewFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers, nil, nil)
}

// NewFilteredUnstructuredInformer constructs a new informer for Unstructured type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
func NewFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListRequestListOptions internalinterfaces.TweakListOptionsFunc, tweakWatchRequestListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
ctx := context.Background()
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
if tweakListRequestListOptions != nil {
tweakListRequestListOptions(&options)
}
return client.Resource(resource).Namespace(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
if tweakWatchRequestListOptions != nil {
tweakWatchRequestListOptions(&options)
}
return client.Resource(resource).Namespace(namespace).Watch(ctx, options)
},
Expand Down
10 changes: 8 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
WithField("podCleanup", podCleanupWorkers).
Info("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers)
wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)
wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer()
wfc.artGCTaskInformer = wfc.newArtGCTaskInformer()
Expand Down Expand Up @@ -817,7 +817,7 @@ func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error {
return nil
}

func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) {
func (wfc *WorkflowController) tweakListRequestListOptions(options *metav1.ListOptions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly: keep them in step with

tweakWatchRequestListOptions(options)
options.ResourceVersion = ""

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a preference here.

labelSelector := labels.NewSelector().
Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
options.LabelSelector = labelSelector.String()
Expand All @@ -826,6 +826,12 @@ func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) {
options.ResourceVersion = ""
}

func (wfc *WorkflowController) tweakWatchRequestListOptions(options *metav1.ListOptions) {
labelSelector := labels.NewSelector().
Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
options.LabelSelector = labelSelector.String()
}

func getWfPriority(obj interface{}) (int32, time.Time) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl

// always compare to WorkflowController.Run to see what this block of code should be doing
{
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListOptions, indexers)
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets()
wfc.artGCTaskInformer = informerFactory.Argoproj().V1alpha1().WorkflowArtifactGCTasks()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
Expand Down
7 changes: 4 additions & 3 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ func (cc *Controller) Run(ctx context.Context) {
}).ForResource(schema.GroupVersionResource{Group: workflow.Group, Version: workflow.Version, Resource: workflow.CronWorkflowPlural})
cc.addCronWorkflowInformerHandler()

wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod, func(options *v1.ListOptions) {
wfInformerListOptionsFunc(options, cc.instanceId)
}, cache.Indexers{})
wfInformer := util.NewWorkflowInformer(cc.dynamicInterface, cc.managedNamespace, cronWorkflowResyncPeriod,
func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceId) },
func(options *v1.ListOptions) { wfInformerListOptionsFunc(options, cc.instanceId) },
cache.Indexers{})
go wfInformer.Run(ctx.Done())

cc.wfLister = util.NewWorkflowLister(wfInformer)
Expand Down
5 changes: 3 additions & 2 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import (
// objects. We no longer return WorkflowInformer due to:
// https://github.com/kubernetes/kubernetes/issues/57705
// https://github.com/argoproj/argo-workflows/issues/632
func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer {
func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time.Duration, tweakListRequestListOptions internalinterfaces.TweakListOptionsFunc, tweakWatchRequestListOptions internalinterfaces.TweakListOptionsFunc, indexers cache.Indexers) cache.SharedIndexInformer {
resource := schema.GroupVersionResource{
Group: workflow.Group,
Version: "v1alpha1",
Expand All @@ -71,7 +71,8 @@ func NewWorkflowInformer(dclient dynamic.Interface, ns string, resyncPeriod time
ns,
resyncPeriod,
indexers,
tweakListOptions,
tweakListRequestListOptions,
tweakWatchRequestListOptions,
)
return informer
}
Expand Down