-
Notifications
You must be signed in to change notification settings - Fork 0
[Review] fix: workflow controller to detect stale workflows #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,6 +82,11 @@ type recentCompletions struct { | |
| mutex gosync.RWMutex | ||
| } | ||
|
|
||
| type lastSeenVersions struct { | ||
| versions map[string]string | ||
| mutex gosync.RWMutex | ||
| } | ||
|
|
||
| // WorkflowController is the controller for workflow resources | ||
| type WorkflowController struct { | ||
| // namespace of the workflow controller | ||
|
|
@@ -153,6 +158,8 @@ type WorkflowController struct { | |
| recentCompletions recentCompletions | ||
| // lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled | ||
| lastUnreconciledWorkflows map[string]*wfv1.Workflow | ||
|
|
||
| lastSeenVersions lastSeenVersions // key: workflow UID, value: resource version | ||
| } | ||
|
|
||
| const ( | ||
|
|
@@ -205,6 +212,10 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli | |
| eventRecorderManager: events.NewEventRecorderManager(kubeclientset), | ||
| progressPatchTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressPatchTickDuration, 1*time.Minute), | ||
| progressFileTickDuration: env.LookupEnvDurationOr(ctx, common.EnvVarProgressFileTickDuration, 3*time.Second), | ||
| lastSeenVersions: lastSeenVersions{ | ||
| versions: make(map[string]string), | ||
| mutex: gosync.RWMutex{}, | ||
| }, | ||
| } | ||
|
|
||
| if executorPlugins { | ||
|
|
@@ -724,6 +735,12 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { | |
| return true | ||
| } | ||
|
|
||
| if wfc.isOutdated(un) { | ||
| logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event") | ||
| wfc.wfQueue.AddRateLimited(key) | ||
| return true | ||
| } | ||
|
|
||
| if !reconciliationNeeded(un) { | ||
| logger.WithField("key", key).Debug(ctx, "Won't process Workflow since it's completed") | ||
| return true | ||
|
|
@@ -946,6 +963,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) | |
| if !needed { | ||
| key, _ := cache.MetaNamespaceKeyFunc(un) | ||
| wfc.recordCompletedWorkflow(key) | ||
| wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un)) | ||
| } | ||
| return needed | ||
| }, | ||
|
|
@@ -1003,6 +1021,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) | |
| // no need to add to the queue - this workflow is done | ||
| wfc.throttler.Remove(key) | ||
| } | ||
| wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(obj.(*unstructured.Unstructured))) | ||
| }, | ||
| }, | ||
| }, | ||
|
|
@@ -1346,3 +1365,25 @@ func (wfc *WorkflowController) IsLeader() bool { | |
| // the wfc.wfInformer is nil if it is not the leader | ||
| return wfc.wfInformer != nil | ||
| } | ||
|
|
||
| func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool { | ||
| wfc.lastSeenVersions.mutex.RLock() | ||
| defer wfc.lastSeenVersions.mutex.RUnlock() | ||
| lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)] | ||
| // always process if not seen before | ||
| if !ok || lastSeenRV == "" { | ||
| return false | ||
| } | ||
| annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] | ||
| return annotations != lastSeenRV | ||
| } | ||
|
|
||
| func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string { | ||
| return string(wf.GetUID()) | ||
| } | ||
|
|
||
| func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) { | ||
| wfc.lastSeenVersions.mutex.Lock() | ||
| defer wfc.lastSeenVersions.mutex.Unlock() | ||
| delete(wfc.lastSeenVersions.versions, key) | ||
| } | ||
|
Comment on lines
+1369
to
+1389
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make isOutdated resilient to missing or externally modified annotations Current implementation: func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool {
wfc.lastSeenVersions.mutex.RLock()
defer wfc.lastSeenVersions.mutex.RUnlock()
lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)]
// always process if not seen before
if !ok || lastSeenRV == "" {
return false
}
annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
return annotations != lastSeenRV
}Concerns:
To make this more robust while preserving stale‑event protection, consider treating a missing or empty annotation as “we can’t safely apply staleness filtering” and either:
For example: func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool {
wfc.lastSeenVersions.mutex.RLock()
- lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)]
+ key := wfc.getLastSeenVersionKey(wf)
+ lastSeenRV, ok := wfc.lastSeenVersions.versions[key]
wfc.lastSeenVersions.mutex.RUnlock()
// always process if not seen before
if !ok || lastSeenRV == "" {
return false
}
- annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
- return annotations != lastSeenRV
+ ann := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
+ if ann == "" {
+ // Annotation missing or cleared; drop stale map entry to avoid wedging this workflow.
+ wfc.deleteLastSeenVersionKey(key)
+ return false
+ }
+ return ann != lastSeenRV
}This keeps the optimisation for genuine stale events while avoiding permanent starvation if the annotation is lost or touched by external tooling. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -761,6 +761,8 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { | |
| woc.log.WithError(err).Warn(ctx, "error updating taskset") | ||
| } | ||
|
|
||
| oldRV := woc.wf.ResourceVersion | ||
| woc.updateLastSeenVersionAnnotation(oldRV) | ||
| wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) | ||
|
Comment on lines
+764
to
766
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep annotation and in-memory last-seen map in sync on all persist paths The normal
However, the size-limit error path only updates the map: woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(ctx, err)
oldRV := woc.wf.ResourceVersion
_, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
...
} else {
woc.updateLastSeenVersion(oldRV)
}If the previous persisted object already had a To preserve the invariant that the map and annotation always match after any successful write, the size‑limit path should also set the annotation before the Suggested diff: func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) {
woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(ctx, err)
- oldRV := woc.wf.ResourceVersion
- _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
+ oldRV := woc.wf.ResourceVersion
+ woc.updateLastSeenVersionAnnotation(oldRV)
+ _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error")
} else {
woc.updateLastSeenVersion(oldRV)
}
}This keeps the annotation and Also applies to: 789-789, 865-871 🤖 Prompt for AI Agents |
||
| if err != nil { | ||
| woc.log.WithField("error", err).WithField("reason", apierr.ReasonForError(err)).Warn(ctx, "Error updating workflow") | ||
|
|
@@ -784,6 +786,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { | |
| woc.controller.hydrator.HydrateWithNodes(woc.wf, nodes) | ||
| } | ||
|
|
||
| woc.updateLastSeenVersion(oldRV) | ||
| // The workflow returned from wfClient.Update doesn't have a TypeMeta associated | ||
| // with it, so copy from the original workflow. | ||
| woc.wf.TypeMeta = woc.orig.TypeMeta | ||
|
|
@@ -859,9 +862,12 @@ func (woc *wfOperationCtx) writeBackToInformer() error { | |
| func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) { | ||
| woc.wf = woc.orig.DeepCopy() | ||
| woc.markWorkflowError(ctx, err) | ||
| oldRV := woc.wf.ResourceVersion | ||
| _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) | ||
| if err != nil { | ||
| woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error") | ||
| } else { | ||
| woc.updateLastSeenVersion(oldRV) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -4393,3 +4399,19 @@ func (woc *wfOperationCtx) setNodeDisplayName(ctx context.Context, node *wfv1.No | |
| newNode.DisplayName = displayName | ||
| woc.wf.Status.Nodes.Set(ctx, nodeID, *newNode) | ||
| } | ||
|
|
||
| func (woc *wfOperationCtx) updateLastSeenVersionAnnotation(value string) { | ||
| if woc.wf.GetAnnotations() == nil { | ||
| woc.wf.SetAnnotations(make(map[string]string)) | ||
| } | ||
| woc.wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] = value | ||
| } | ||
|
|
||
| func (woc *wfOperationCtx) updateLastSeenVersion(value string) { | ||
| woc.controller.lastSeenVersions.mutex.Lock() | ||
| defer woc.controller.lastSeenVersions.mutex.Unlock() | ||
| if woc.controller.lastSeenVersions.versions == nil { | ||
| woc.controller.lastSeenVersions.versions = make(map[string]string) | ||
| } | ||
| woc.controller.lastSeenVersions.versions[woc.controller.getLastSeenVersionKey(woc.wf)] = value | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outdated-event handling may spin indefinitely if invariants are broken
The early‑return in
processNextItem:is fine as long as
isOutdatedonly returnstruetemporarily (for genuinely stale states). However, if thelastSeenVersionsmap and thelast-seen-versionannotation ever diverge permanently for a non‑completed workflow (e.g. annotation removed or overwritten by a user/tool), this branch will:reconciliationNeededoroperate, effectively wedging that workflow.The robustness of
isOutdatedis therefore critical; see comment on its implementation below.🤖 Prompt for AI Agents