diff --git a/workflow/common/common.go b/workflow/common/common.go index 1838972cc49a..df259f50674e 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -52,6 +52,10 @@ const ( // AnnotationKeyArtifactGCStrategy is listed as an annotation on the Artifact GC Pod to identify // the strategy whose artifacts are being deleted AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy" + + // AnnotationKeyLastSeenVersion stores the last seen version of the workflow when it was last successfully processed by the controller + AnnotationKeyLastSeenVersion = workflow.WorkflowFullName + "/last-seen-version" + // AnnotationKeyPodGCStrategy is listed as an annotation on the Pod // the strategy for the pod, in case the pod is orphaned from its workflow AnnotationKeyPodGCStrategy = workflow.WorkflowFullName + "/pod-gc-strategy" diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 211b8f4a3089..66d57f282d5c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -79,6 +79,11 @@ type recentCompletions struct { mutex gosync.RWMutex } +type lastSeenVersions struct { + versions map[string]string + mutex gosync.RWMutex +} + // WorkflowController is the controller for workflow resources type WorkflowController struct { // namespace of the workflow controller @@ -148,6 +153,8 @@ type WorkflowController struct { executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin recentCompletions recentCompletions + + lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version } const ( @@ -201,6 +208,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli eventRecorderManager: events.NewEventRecorderManager(kubeclientset), progressPatchTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute), progressFileTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second), + lastSeenVersions: lastSeenVersions{ + versions: make(map[string]string), + mutex: gosync.RWMutex{}, + }, } if executorPlugins { @@ -687,6 +698,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wfc.isOutdated(un) { + log.WithField("key", key).Debug("Skipping outdated workflow event") + wfc.wfQueue.AddRateLimited(key) + return true + } + if !reconciliationNeeded(un) { log.WithFields(log.Fields{"key": key}).Debug("Won't process Workflow since it's completed") return true @@ -903,6 +920,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) if !needed { key, _ := cache.MetaNamespaceKeyFunc(un) wfc.recordCompletedWorkflow(key) + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un)) } return needed }, @@ -960,6 +978,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } + wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured))) }, }, }, @@ -1300,3 +1319,25 @@ func (wfc *WorkflowController) IsLeader() bool { // the wfc.wfInformer is nil if it is not the leader return wfc.wfInformer != nil } + +func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool { + wfc.lastSeenVersions.mutex.RLock() + defer wfc.lastSeenVersions.mutex.RUnlock() + lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)] + // always process if not seen before + if !ok || lastSeenRV == "" { + return false + } + annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] + return annotations != lastSeenRV +} + +func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string { + return string(wf.GetUID()) +} + +func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) { + wfc.lastSeenVersions.mutex.Lock() + defer wfc.lastSeenVersions.mutex.Unlock() + delete(wfc.lastSeenVersions.versions, key) +} diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 9a63b32c2e71..c410baf2c74e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -758,6 +758,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.log.WithError(err).Warn("error updating taskset") } + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err)) @@ -780,6 +782,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes) } + woc.updateLastSeenVersion(oldRV) // The workflow returned from wfClient.Update doesn't have a TypeMeta associated // with it, so copy from the original workflow. woc.wf.TypeMeta = woc.orig.TypeMeta @@ -861,9 +864,13 @@ func (woc *wfOperationCtx) writeBackToInformer() error { func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { woc.wf = woc.orig.DeepCopy() woc.markWorkflowError(ctx, err) + oldRV := woc.wf.ResourceVersion + woc.updateLastSeenVersionAnnotation(oldRV) _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) if err != nil { woc.log.Warnf("Error updating workflow with size error: %v", err) + } else { + woc.updateLastSeenVersion(oldRV) } } @@ -4291,3 +4298,19 @@ func getChildNodeIdsRetried(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string { } return childrenIds } + +func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { + if woc.wf.GetAnnotations() == nil { + woc.wf.SetAnnotations(make(map[string]string)) + } + woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value +} + +func (woc *wfOperationCtx) updateLastSeenVersion(value string) { + woc.controller.lastSeenVersions.mutex.Lock() + defer woc.controller.lastSeenVersions.mutex.Unlock() + if woc.controller.lastSeenVersions.versions == nil { + woc.controller.lastSeenVersions.versions = make(map[string]string) + } + woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value +}