From eaf0eac0132438b5d375fb218812e85ff9aec0d7 Mon Sep 17 00:00:00 2001 From: Eduardo Rodrigues Date: Fri, 28 Nov 2025 16:55:41 +0000 Subject: [PATCH 1/4] fix: workflow controller to detect stale workflows Signed-off-by: Eduardo Rodrigues --- workflow/common/common.go | 3 +++ workflow/controller/controller.go | 41 +++++++++++++++++++++++++++++++ workflow/controller/operator.go | 19 ++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/workflow/common/common.go b/workflow/common/common.go index 6352462a8198..29686767744d 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -57,6 +57,9 @@ const ( // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" + // AnnotationKeyLastSeenVersion is the last seen version for the workflow + AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version" + // LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism. LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 5bf5d5dadc86..caf6bb6a5bb8 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -82,6 +82,11 @@ type recentCompletions struct { mutex gosync.RWMutex } +type lastSeenVersions struct { + versions map[string]string + mutex gosync.RWMutex +} + // WorkflowController is the controller for workflow resources type WorkflowController struct { // namespace of the workflow controller @@ -153,6 +158,8 @@ type WorkflowController struct { recentCompletions recentCompletions // lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled lastUnreconciledWorkflows map[string]*wfv1.Workflow + + lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version } const ( @@ -205,6 +212,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli eventRecorderManager: events.NewEventRecorderManager(kubeclientset), progressPatchTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute), progressFileTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second), + lastSeenVersions: lastSeenVersions{ + versions: make(map[string]string), + mutex: gosync.RWMutex{}, + }, } if executorPlugins { @@ -724,6 +735,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wfc.isOutdated(un) { + logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event") + wfc.wfQueue.AddRateLimited(key) + return true + } + if !reconciliationNeeded(un) { logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed") return true @@ -946,6 +963,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if !needed { key, _ := cache.MetaNamespaceKeyFunc(un) wfc.recordCompletedWorkflow(key) + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un)) } return needed }, @@ -1003,6 +1021,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured))) }, }, }, @@ -1346,3 +1365,25 @@ func (wfc *WorkflowController) IsLeader() bool { // the wfc.wfInformer is nil if it is not the leader return wfc.wfInformer != nil } + +func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool { + wfc.lastSeenVersions.mutex.RLock() + defer wfc.lastSeenVersions.mutex.RUnlock() + lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)] + // always process if not seen before + if !ok || lastSeenRV == "" { + return false + } + annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] + return annotations != lastSeenRV +} + +func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string { + return string(wf.GetUID()) +} + +func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) { + wfc.lastSeenVersions.mutex.Lock() + defer wfc.lastSeenVersions.mutex.Unlock() + delete(wfc.lastSeenVersions.versions, key) +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 07b0632eddf0..319536d9102e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -761,6 +761,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.log.WithError(err).Warn(ctx, "error updating taskset") } + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow") @@ -784,6 +786,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes) } + woc.updateLastSeenVersion(oldRV) // The workflow returned from wfClient.Update doesn't have a TypeMeta associated // with it, so copy from the original workflow. woc.wf.TypeMeta = woc.orig.TypeMeta @@ -859,9 +862,12 @@ func (woc *wfOperationCtx) writeBackToInformer() error { func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { woc.wf = woc.orig.DeepCopy() woc.markWorkflowError(ctx, err) + oldRV := woc.wf.ResourceVersion _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error") + } else { + woc.updateLastSeenVersion(oldRV) } } @@ -4393,3 +4399,16 @@ func (woc *wfOperationCtx) setNodeDisplayName(ctx context.Context, node *wfv1.No newNode.DisplayName = displayName woc.wf.Status.Nodes.Set(ctx, nodeID, *newNode) } + +func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { + if woc.wf.GetAnnotations() == nil { + woc.wf.Annotations = make(map[string]string) + } + woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value +} + +func (woc *wfOperationCtx) updateLastSeenVersion(value string) { + woc.controller.lastSeenVersions.mutex.Lock() + defer woc.controller.lastSeenVersions.mutex.Unlock() + woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value +} From bc052a8373d75fd298d8c16dea80b340315714be Mon Sep 17 00:00:00 2001 From: Eduardo Rodrigues Date: Fri, 28 Nov 2025 18:10:29 +0000 Subject: [PATCH 2/4] fix: safe map assignment Signed-off-by: Eduardo Rodrigues --- workflow/controller/operator.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 319536d9102e..30361ad979ed 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -4402,7 +4402,7 @@ func (woc *wfOperationCtx) setNodeDisplayName(ctx context.Context, node *wfv1.No func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { if woc.wf.GetAnnotations() == nil { - woc.wf.Annotations = make(map[string]string) + woc.wf.SetAnnotations(make(map[string]string)) } woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value } @@ -4410,5 +4410,8 @@ func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { func (woc *wfOperationCtx) updateLastSeenVersion(value string) { woc.controller.lastSeenVersions.mutex.Lock() defer woc.controller.lastSeenVersions.mutex.Unlock() + if woc.controller.lastSeenVersions.versions == nil { + woc.controller.lastSeenVersions.versions = make(map[string]string) + } woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value } From c97f3d071012634114f5773ed2aa1e1ee1b0f60b Mon Sep 17 00:00:00 2001 From: Eduardo Rodrigues Date: Wed, 10 Dec 2025 18:29:15 +0000 Subject: [PATCH 3/4] docs: update AnnotationKeyLastSeenVersion comment Signed-off-by: Eduardo Rodrigues --- workflow/common/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/common/common.go b/workflow/common/common.go index 29686767744d..5542d17acb3a 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -57,7 +57,7 @@ const ( // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" - // AnnotationKeyLastSeenVersion is the last seen version for the workflow + // AnnotationKeyLastSeenVersion stores the last seen version of the workflow when it was last successfully processed by the controller AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version" // LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism. From 917bdab43a971cc6e786b842696745c74b268e6a Mon Sep 17 00:00:00 2001 From: Eduardo Rodrigues Date: Wed, 10 Dec 2025 18:31:37 +0000 Subject: [PATCH 4/4] fix: make sure last seen annotation is updated before updating on persistWorkflowSizeLimitErr Signed-off-by: Eduardo Rodrigues --- workflow/controller/operator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 30361ad979ed..9faa6b68a5f2 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -863,6 +863,7 @@ func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfCl woc.wf = woc.orig.DeepCopy() woc.markWorkflowError(ctx, err) oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error")