diff --git a/workflow/common/common.go b/workflow/common/common.go index 6352462a8198..5542d17acb3a 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -57,6 +57,9 @@ const ( // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" + // AnnotationKeyLastSeenVersion stores the last seen version of the workflow when it was last successfully processed by the controller + AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version" + // LabelParallelismLimit is a label applied on namespace objects to control the per namespace parallelism. LabelParallelismLimit = workflow.WorkflowFullName + "/parallelism-limit" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 2b91caa84a30..45313cc50907 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -82,6 +82,11 @@ type recentCompletions struct { mutex gosync.RWMutex } +type lastSeenVersions struct { + versions map[string]string + mutex gosync.RWMutex +} + // WorkflowController is the controller for workflow resources type WorkflowController struct { // namespace of the workflow controller @@ -153,6 +158,8 @@ type WorkflowController struct { recentCompletions recentCompletions // lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled lastUnreconciledWorkflows map[string]*wfv1.Workflow + + lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version } const ( @@ -205,6 +212,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli eventRecorderManager: events.NewEventRecorderManager(kubeclientset), progressPatchTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute), progressFileTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second), + lastSeenVersions: lastSeenVersions{ + versions: make(map[string]string), + mutex: gosync.RWMutex{}, + }, } if executorPlugins { @@ -724,6 +735,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wfc.isOutdated(un) { + logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event") + wfc.wfQueue.AddRateLimited(key) + return true + } + if !reconciliationNeeded(un) { logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed") return true @@ -949,6 +966,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if !needed { key, _ := cache.MetaNamespaceKeyFunc(un) wfc.recordCompletedWorkflow(key) + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un)) } return needed }, @@ -1006,6 +1024,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured))) }, }, }, @@ -1355,3 +1374,25 @@ func (wfc *WorkflowController) IsLeader() bool { // the wfc.wfInformer is nil if it is not the leader return wfc.wfInformer != nil } + +func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool { + wfc.lastSeenVersions.mutex.RLock() + defer wfc.lastSeenVersions.mutex.RUnlock() + lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)] + // always process if not seen before + if !ok || lastSeenRV == "" { + return false + } + annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] + return annotations != lastSeenRV +} + +func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string { + return string(wf.GetUID()) +} + +func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) { + wfc.lastSeenVersions.mutex.Lock() + defer wfc.lastSeenVersions.mutex.Unlock() + delete(wfc.lastSeenVersions.versions, key) +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index e80d46f48dfa..1997d2771b59 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -761,6 +761,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.log.WithError(err).Warn(ctx, "error updating taskset") } + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow") @@ -784,6 +786,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes) } + woc.updateLastSeenVersion(oldRV) // The workflow returned from wfClient.Update doesn't have a TypeMeta associated // with it, so copy from the original workflow. woc.wf.TypeMeta = woc.orig.TypeMeta @@ -866,9 +869,13 @@ func (woc *wfOperationCtx) writeBackToInformer() error { func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { woc.wf = woc.orig.DeepCopy() woc.markWorkflowError(ctx, err) + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error") + } else { + woc.updateLastSeenVersion(oldRV) } } @@ -4527,3 +4534,19 @@ func (woc *wfOperationCtx) setNodeDisplayName(ctx context.Context, node *wfv1.No newNode.DisplayName = displayName woc.wf.Status.Nodes.Set(ctx, nodeID, *newNode) } + +func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { + if woc.wf.GetAnnotations() == nil { + woc.wf.SetAnnotations(make(map[string]string)) + } + woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value +} + +func (woc *wfOperationCtx) updateLastSeenVersion(value string) { + woc.controller.lastSeenVersions.mutex.Lock() + defer woc.controller.lastSeenVersions.mutex.Unlock() + if woc.controller.lastSeenVersions.versions == nil { + woc.controller.lastSeenVersions.versions = make(map[string]string) + } + woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value +}