diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 0241d1d3f75f..dcd0ad6d7444 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + gosync "sync" "syscall" "time" @@ -69,6 +70,16 @@ import ( const maxAllowedStackDepth = 100 +type recentlyCompletedWorkflow struct { + key string + when time.Time +} + +type recentCompletions struct { + completions []recentlyCompletedWorkflow + mutex gosync.RWMutex +} + // WorkflowController is the controller for workflow resources type WorkflowController struct { // namespace of the workflow controller @@ -135,6 +146,8 @@ type WorkflowController struct { // Default is 3s and can be configured using the env var ARGO_PROGRESS_FILE_TICK_DURATION progressFileTickDuration time.Duration executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin + + recentCompletions recentCompletions } const ( @@ -751,6 +764,11 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { return true } + if wf.Status.Phase != "" && wfc.checkRecentlyCompleted(wf.ObjectMeta.Name) { + log.WithFields(log.Fields{"name": wf.ObjectMeta.Name}).Warn("Cache: Rejecting recently deleted") + return true + } + // this will ensure we process every incomplete workflow once every 20m wfc.wfQueue.AddAfter(key, workflowResyncPeriod) @@ -851,18 +869,81 @@ func getWfPriority(obj interface{}) (int32, time.Time) { return int32(priority), un.GetCreationTimestamp().Time } +// 10 minutes in the past +const maxCompletedStoreTime = time.Second * -600 + +// This is a helper function for expiring old records of workflows +// completed more than maxCompletedStoreTime ago +func (wfc *WorkflowController) cleanCompletedWorkflowsRecord() { + cutoff := time.Now().Add(maxCompletedStoreTime) + removeIndex := -1 + wfc.recentCompletions.mutex.Lock() + defer wfc.recentCompletions.mutex.Unlock() + + for i, val := range wfc.recentCompletions.completions { + if val.when.After(cutoff) { + removeIndex = i - 1 + break + } + } + if removeIndex >= 0 { + wfc.recentCompletions.completions = wfc.recentCompletions.completions[removeIndex+1:] + } +} + +// Records a workflow as recently completed in the list +// if it isn't already in the list +func (wfc *WorkflowController) recordCompletedWorkflow(key string) { + if !wfc.checkRecentlyCompleted(key) { + wfc.recentCompletions.mutex.Lock() + defer wfc.recentCompletions.mutex.Unlock() + wfc.recentCompletions.completions = append(wfc.recentCompletions.completions, + recentlyCompletedWorkflow{ + key: key, + when: time.Now(), + }) + } +} + +// Returns true if the workflow given by key is in the recently completed +// list. Will perform expiry cleanup before checking. +func (wfc *WorkflowController) checkRecentlyCompleted(key string) bool { + wfc.cleanCompletedWorkflowsRecord() + recent := false + wfc.recentCompletions.mutex.RLock() + defer wfc.recentCompletions.mutex.RUnlock() + for _, val := range wfc.recentCompletions.completions { + if val.key == key { + recent = true + break + } + } + return recent +} + func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) { wfc.wfInformer.AddEventHandler( cache.FilteringResourceEventHandler{ + // FilterFunc is called for every operation affecting the + // informer cache and can be used to reject things from + // the cache. When they are rejected (this returns false) + // they will be deleted. FilterFunc: func(obj interface{}) bool { un, ok := obj.(*unstructured.Unstructured) if !ok { log.Warnf("Workflow FilterFunc: '%v' is not an unstructured", obj) return false } - return reconciliationNeeded(un) + needed := reconciliationNeeded(un) + if !needed { + key, _ := cache.MetaNamespaceKeyFunc(un) + wfc.recordCompletedWorkflow(key) + } + return needed }, Handler: cache.ResourceEventHandlerFuncs{ + // This function is called when a new to the informer object + // is to be added to the informer AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { @@ -872,6 +953,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) wfc.throttler.Add(key, priority, creation) } }, + // This function is called when an updated (we already know about this object) + // is to be updated in the informer UpdateFunc: func(old, new interface{}) { oldWf, newWf := old.(*unstructured.Unstructured), new.(*unstructured.Unstructured) // this check is very important to prevent doing many reconciliations we do not need to do @@ -885,12 +968,15 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) wfc.throttler.Add(key, priority, creation) } }, + // This function is called when an object is to be removed + // from the informer DeleteFunc: func(obj interface{}) { // IndexerInformer uses a delta queue, therefore for deletes we have to use this // key function. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { wfc.releaseAllWorkflowLocks(obj) + wfc.recordCompletedWorkflow(key) // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 757846036ba2..680b1cf437d1 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -207,7 +207,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { } }() - woc.log.Info("Processing workflow") + woc.log.WithFields(log.Fields{"Phase": woc.wf.Status.Phase, "ResourceVersion": woc.wf.ObjectMeta.ResourceVersion}).Info("Processing workflow") // Set the Execute workflow spec for execution // ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault