Skip to content

Commit

Permalink
fix: completed workflow tracking (#12198)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel authored and sarabala1979 committed Jan 8, 2024
1 parent c4251fa commit 3ecfe56
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 2 deletions.
88 changes: 87 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
gosync "sync"
"syscall"
"time"

Expand Down Expand Up @@ -69,6 +70,16 @@ import (

const maxAllowedStackDepth = 100

type recentlyCompletedWorkflow struct {
key string
when time.Time
}

type recentCompletions struct {
completions []recentlyCompletedWorkflow
mutex gosync.RWMutex
}

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -135,6 +146,8 @@ type WorkflowController struct {
// Default is 3s and can be configured using the env var ARGO_PROGRESS_FILE_TICK_DURATION
progressFileTickDuration time.Duration
executorPlugins map[string]map[string]*spec.Plugin // namespace -> name -> plugin

recentCompletions recentCompletions
}

const (
Expand Down Expand Up @@ -751,6 +764,11 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

if wf.Status.Phase != "" && wfc.checkRecentlyCompleted(wf.ObjectMeta.Name) {
log.WithFields(log.Fields{"name": wf.ObjectMeta.Name}).Warn("Cache: Rejecting recently deleted")
return true
}

// this will ensure we process every incomplete workflow once every 20m
wfc.wfQueue.AddAfter(key, workflowResyncPeriod)

Expand Down Expand Up @@ -851,18 +869,81 @@ func getWfPriority(obj interface{}) (int32, time.Time) {
return int32(priority), un.GetCreationTimestamp().Time
}

// 10 minutes in the past
const maxCompletedStoreTime = time.Second * -600

// This is a helper function for expiring old records of workflows
// completed more than maxCompletedStoreTime ago
func (wfc *WorkflowController) cleanCompletedWorkflowsRecord() {
cutoff := time.Now().Add(maxCompletedStoreTime)
removeIndex := -1
wfc.recentCompletions.mutex.Lock()
defer wfc.recentCompletions.mutex.Unlock()

for i, val := range wfc.recentCompletions.completions {
if val.when.After(cutoff) {
removeIndex = i - 1
break
}
}
if removeIndex >= 0 {
wfc.recentCompletions.completions = wfc.recentCompletions.completions[removeIndex+1:]
}
}

// Records a workflow as recently completed in the list
// if it isn't already in the list
func (wfc *WorkflowController) recordCompletedWorkflow(key string) {
if !wfc.checkRecentlyCompleted(key) {
wfc.recentCompletions.mutex.Lock()
defer wfc.recentCompletions.mutex.Unlock()
wfc.recentCompletions.completions = append(wfc.recentCompletions.completions,
recentlyCompletedWorkflow{
key: key,
when: time.Now(),
})
}
}

// Returns true if the workflow given by key is in the recently completed
// list. Will perform expiry cleanup before checking.
func (wfc *WorkflowController) checkRecentlyCompleted(key string) bool {
wfc.cleanCompletedWorkflowsRecord()
recent := false
wfc.recentCompletions.mutex.RLock()
defer wfc.recentCompletions.mutex.RUnlock()
for _, val := range wfc.recentCompletions.completions {
if val.key == key {
recent = true
break
}
}
return recent
}

func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) {
wfc.wfInformer.AddEventHandler(
cache.FilteringResourceEventHandler{
// FilterFunc is called for every operation affecting the
// informer cache and can be used to reject things from
// the cache. When they are rejected (this returns false)
// they will be deleted.
FilterFunc: func(obj interface{}) bool {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("Workflow FilterFunc: '%v' is not an unstructured", obj)
return false
}
return reconciliationNeeded(un)
needed := reconciliationNeeded(un)
if !needed {
key, _ := cache.MetaNamespaceKeyFunc(un)
wfc.recordCompletedWorkflow(key)
}
return needed
},
Handler: cache.ResourceEventHandlerFuncs{
// This function is called when a new to the informer object
// is to be added to the informer
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
Expand All @@ -872,6 +953,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
wfc.throttler.Add(key, priority, creation)
}
},
// This function is called when an updated (we already know about this object)
// is to be updated in the informer
UpdateFunc: func(old, new interface{}) {
oldWf, newWf := old.(*unstructured.Unstructured), new.(*unstructured.Unstructured)
// this check is very important to prevent doing many reconciliations we do not need to do
Expand All @@ -885,12 +968,15 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
wfc.throttler.Add(key, priority, creation)
}
},
// This function is called when an object is to be removed
// from the informer
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.releaseAllWorkflowLocks(obj)
wfc.recordCompletedWorkflow(key)
// no need to add to the queue - this workflow is done
wfc.throttler.Remove(key)
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}()

woc.log.Info("Processing workflow")
woc.log.WithFields(log.Fields{"Phase": woc.wf.Status.Phase, "ResourceVersion": woc.wf.ObjectMeta.ResourceVersion}).Info("Processing workflow")

// Set the Execute workflow spec for execution
// ExecWF is a runtime execution spec which merged from Wf, WFT and Wfdefault
Expand Down

0 comments on commit 3ecfe56

Please sign in to comment.