From 6d9c6d1653856bd1e4c8e25bd744c4ad33b0e87d Mon Sep 17 00:00:00 2001 From: Eduardo Rodrigues Date: Tue, 20 Jan 2026 08:58:17 +0000 Subject: [PATCH] fix: workflow controller to detect stale workflows (#15090) Co-authored-by: Alan Clucas (cherry picked from commit b7670b6789577454a8478e8eb5e8d1b9529546dc) Signed-off-by: Eduardo Rodrigues Signed-off-by: Alan Clucas --- workflow/common/common.go | 3 +++ workflow/controller/controller.go | 41 +++++++++++++++++++++++++++++++ workflow/controller/operator.go | 23 +++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/workflow/common/common.go b/workflow/common/common.go index 03ca2db607d6..89d805036488 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -53,6 +53,9 @@ const ( // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" + // AnnotationKeyLastSeenVersion stores the last seen version of the workflow when it was last successfully processed by the controller + AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version" + // LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism. LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 2aa3a30ab019..0bbb9c2c9fa3 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -81,6 +81,11 @@ type recentCompletions struct { mutex gosync.RWMutex } +type lastSeenVersions struct { + versions map[string]string + mutex gosync.RWMutex +} + // WorkflowController is the controller for workflow resources type WorkflowController struct { // namespace of the workflow controller @@ -152,6 +157,8 @@ type WorkflowController struct { recentCompletions recentCompletions // lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled lastUnreconciledWorkflows map[string]*wfv1.Workflow + + lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version } const ( @@ -205,6 +212,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli eventRecorderManager: events.NewEventRecorderManager(kubeclientset), progressPatchTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute), progressFileTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second), + lastSeenVersions: lastSeenVersions{ + versions: make(map[string]string), + mutex: gosync.RWMutex{}, + }, } if executorPlugins { @@ -708,6 +719,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wfc.isOutdated(un) { + log.WithField("key", key).Debug("Skipping outdated workflow event") + wfc.wfQueue.AddRateLimited(key) + return true + } + if !reconciliationNeeded(un) { log.WithFields(log.Fields{"key": key}).Debug("Won't process Workflow since it's completed") return true @@ -924,6 +941,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if !needed { key, _ := cache.MetaNamespaceKeyFunc(un) wfc.recordCompletedWorkflow(key) + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un)) } return needed }, @@ -981,6 +999,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured))) }, }, }, @@ -1321,3 +1340,25 @@ func (wfc *WorkflowController) IsLeader() bool { // the wfc.wfInformer is nil if it is not the leader return wfc.wfInformer != nil } + +func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool { + wfc.lastSeenVersions.mutex.RLock() + defer wfc.lastSeenVersions.mutex.RUnlock() + lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)] + // always process if not seen before + if !ok || lastSeenRV == "" { + return false + } + annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] + return annotations != lastSeenRV +} + +func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string { + return string(wf.GetUID()) +} + +func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) { + wfc.lastSeenVersions.mutex.Lock() + defer wfc.lastSeenVersions.mutex.Unlock() + delete(wfc.lastSeenVersions.versions, key) +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index d32c0f2679b3..3dba1ddcd5dc 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -758,6 +758,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.log.WithError(err).Warn("error updating taskset") } + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err)) @@ -780,6 +782,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes) } + woc.updateLastSeenVersion(oldRV) // The workflow returned from wfClient.Update doesn't have a TypeMeta associated // with it, so copy from the original workflow. woc.wf.TypeMeta = woc.orig.TypeMeta @@ -861,9 +864,13 @@ func (woc *wfOperationCtx) writeBackToInformer() error { func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { woc.wf = woc.orig.DeepCopy() woc.markWorkflowError(ctx, err) + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.Warnf("Error updating workflow with size error: %v", err) + } else { + woc.updateLastSeenVersion(oldRV) } } @@ -4431,3 +4438,19 @@ func (woc *wfOperationCtx) setNodeDisplayName(node *wfv1.NodeStatus, displayName newNode.DisplayName = displayName woc.wf.Status.Nodes.Set(nodeID, *newNode) } + +func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { + if woc.wf.GetAnnotations() == nil { + woc.wf.SetAnnotations(make(map[string]string)) + } + woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value +} + +func (woc *wfOperationCtx) updateLastSeenVersion(value string) { + woc.controller.lastSeenVersions.mutex.Lock() + defer woc.controller.lastSeenVersions.mutex.Unlock() + if woc.controller.lastSeenVersions.versions == nil { + woc.controller.lastSeenVersions.versions = make(map[string]string) + } + woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value +}