[Review] fix: workflow controller to detect stale workflows#33
[Review] fix: workflow controller to detect stale workflows#33
Conversation
Signed-off-by: Eduardo Rodrigues <eduardodbr@hotmail.com>
Signed-off-by: Eduardo Rodrigues <eduardodbr@hotmail.com>
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
WalkthroughThe changes introduce a version-tracking mechanism to the workflow controller by adding a new annotation constant, implementing a thread-safe map to store per-workflow resource versions, and integrating version checks and updates throughout the controller and operator processing paths to skip handling outdated workflow events. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Areas requiring attention:
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
workflow/common/common.go(1 hunks)workflow/controller/controller.go(7 hunks)workflow/controller/operator.go(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
workflow/controller/operator.go (1)
workflow/common/common.go (1)
AnnotationKeyLastSeenVersion(61-61)
workflow/controller/controller.go (1)
workflow/common/common.go (1)
AnnotationKeyLastSeenVersion(61-61)
🔇 Additional comments (4)
workflow/common/common.go (1)
60-62: Last-seen-version annotation key looks consistentThe new
AnnotationKeyLastSeenVersionfollows the existing annotation naming pattern (workflow.WorkflowFullName + "/…") and is documented clearly. No issues spotted here.workflow/controller/operator.go (1)
4403-4417: Helper methods are simple and concurrency-safe
updateLastSeenVersionAnnotationandupdateLastSeenVersionare straightforward and correctly:
- Ensure annotations map exists before writing,
- Guard the shared
lastSeenVersions.versionsmap with the controller-level mutex,- Use
getLastSeenVersionKey(woc.wf)so keys line up with the controller side.No issues here from a correctness or concurrency perspective.
workflow/controller/controller.go (2)
85-88: lastSeenVersions structure and initialisation look soundThe new
lastSeenVersionswrapper and its embedding intoWorkflowControllerare reasonable:
- Using a dedicated
RWMutexto guard the map is appropriate given concurrent workers and informer handlers.- Initialising
versions: make(map[string]string)inNewWorkflowControlleravoids nil‑map edge cases.- Keying by
wf.GetUID()matches the comment (“key: workflow UID, value: resource version”) and avoids name/namespace reuse issues.No changes needed here.
Also applies to: 158-163, 215-218
966-967: Good cleanup of last-seen state on completion and deletionCalling:
wfc.deleteLastSeenVersionKey(wfc.getLastSeenVersionKey(un))when
reconciliationNeeded(un)is false, and again fromDeleteFunc, ensures that:
- Completed workflows have their
lastSeenVersionsentries cleared once they are no longer reconcilable.- Deleted workflows also clean up their entries, avoiding leaks keyed by UID.
This matches the intended lifecycle of the in‑memory tracker.
Also applies to: 1024-1025
| if wfc.isOutdated(un) { | ||
| logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event") | ||
| wfc.wfQueue.AddRateLimited(key) | ||
| return true | ||
| } |
There was a problem hiding this comment.
Outdated-event handling may spin indefinitely if invariants are broken
The early‑return in processNextItem:
if wfc.isOutdated(un) {
logger.WithField("key", key).Debug(ctx, "Skipping outdated workflow event")
wfc.wfQueue.AddRateLimited(key)
return true
}is fine as long as isOutdated only returns true temporarily (for genuinely stale states). However, if the lastSeenVersions map and the last-seen-version annotation ever diverge permanently for a non‑completed workflow (e.g. annotation removed or overwritten by a user/tool), this branch will:
- Always classify the workflow as outdated,
- Continuously re‑queue it with backoff,
- Never reach
reconciliationNeededoroperate, effectively wedging that workflow.
The robustness of isOutdated is therefore critical; see comment on its implementation below.
🤖 Prompt for AI Agents
In workflow/controller/controller.go around lines 738-742, the early-return on
isOutdated(...) can cause a workflow to be perpetually re-queued if the
controller's lastSeenVersions map and the workflow's last-seen-version
annotation diverge permanently; change the handling so we don't indefinitely
AddRateLimited+return: detect permanent divergence (e.g. track retry count or
timestamp per key, or make isOutdated return a tri-state that indicates
permanent mismatch), and when that threshold is exceeded log a warning and
either clear/normalize the annotation or treat the item as not-outdated so
reconciliationNeeded/operate run (or drop the key without requeue if
appropriate). Ensure you record/inspect the retry count from the workqueue or
add a small per-key counter, avoid tight infinite requeues, and make the code
fall through to normal reconciliation or a terminal drop once the threshold is
reached.
| func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool { | ||
| wfc.lastSeenVersions.mutex.RLock() | ||
| defer wfc.lastSeenVersions.mutex.RUnlock() | ||
| lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)] | ||
| // always process if not seen before | ||
| if !ok || lastSeenRV == "" { | ||
| return false | ||
| } | ||
| annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion] | ||
| return annotations != lastSeenRV | ||
| } | ||
|
|
||
| func (wfc *WorkflowController) getLastSeenVersionKey(wf metav1.Object) string { | ||
| return string(wf.GetUID()) | ||
| } | ||
|
|
||
| func (wfc *WorkflowController) deleteLastSeenVersionKey(key string) { | ||
| wfc.lastSeenVersions.mutex.Lock() | ||
| defer wfc.lastSeenVersions.mutex.Unlock() | ||
| delete(wfc.lastSeenVersions.versions, key) | ||
| } |
There was a problem hiding this comment.
Make isOutdated resilient to missing or externally modified annotations
Current implementation:
func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool {
wfc.lastSeenVersions.mutex.RLock()
defer wfc.lastSeenVersions.mutex.RUnlock()
lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)]
// always process if not seen before
if !ok || lastSeenRV == "" {
return false
}
annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
return annotations != lastSeenRV
}Concerns:
-
Annotation removed or not propagated
If a workflow was processed once (solastSeenVersionshas an entry) and later some actor removes or overwritesworkflows.argoproj.io/last-seen-version(e.g. viakubectl applyorkubectl annotate), then:lastSeenRVremains non‑empty,annotationsbecomes empty or different,isOutdatedwill always returntruefor that workflow.
Combined with the
processNextItemearly‑return, this can permanently wedge that workflow — it is perpetually classified as outdated and never reconciled again, with no self‑healing path. -
Inconsistent map/annotation due to rare paths
Any future changes that update the map but not the annotation (or vice versa) will have the same effect: a workflow stuck behindisOutdatedand continually re‑queued.
To make this more robust while preserving stale‑event protection, consider treating a missing or empty annotation as “we can’t safely apply staleness filtering” and either:
- Skip staleness filtering and process normally, or
- First drop the map entry so subsequent calls see
!okand also process normally.
For example:
func (wfc *WorkflowController) isOutdated(wf metav1.Object) bool {
wfc.lastSeenVersions.mutex.RLock()
- lastSeenRV, ok := wfc.lastSeenVersions.versions[wfc.getLastSeenVersionKey(wf)]
+ key := wfc.getLastSeenVersionKey(wf)
+ lastSeenRV, ok := wfc.lastSeenVersions.versions[key]
wfc.lastSeenVersions.mutex.RUnlock()
// always process if not seen before
if !ok || lastSeenRV == "" {
return false
}
- annotations := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
- return annotations != lastSeenRV
+ ann := wf.GetAnnotations()[common.AnnotationKeyLastSeenVersion]
+ if ann == "" {
+ // Annotation missing or cleared; drop stale map entry to avoid wedging this workflow.
+ wfc.deleteLastSeenVersionKey(key)
+ return false
+ }
+ return ann != lastSeenRV
}This keeps the optimisation for genuine stale events while avoiding permanent starvation if the annotation is lost or touched by external tooling.
| oldRV := woc.wf.ResourceVersion | ||
| woc.updateLastSeenVersionAnnotation(oldRV) | ||
| wf, err := wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{}) |
There was a problem hiding this comment.
Keep annotation and in-memory last-seen map in sync on all persist paths
The normal persistUpdates path correctly:
- Captures
oldRVfromwoc.wf.ResourceVersion, - Sets
AnnotationKeyLastSeenVersionon the object beforeUpdate, - Updates the controller’s
lastSeenVersionsmap with the sameoldRV.
However, the size-limit error path only updates the map:
woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(ctx, err)
oldRV := woc.wf.ResourceVersion
_, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
...
} else {
woc.updateLastSeenVersion(oldRV)
}If the previous persisted object already had a last-seen-version annotation, this makes the map hold oldRV while the persisted annotation still holds the previous value. WorkflowController.isOutdated will then see annotations != lastSeenRV and treat subsequent events for this workflow as stale, repeatedly re‑queuing and never operating on them, until the workflow is completed/deleted and the key is cleaned up.
To preserve the invariant that the map and annotation always match after any successful write, the size‑limit path should also set the annotation before the Update, mirroring the main path.
Suggested diff:
func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) {
woc.wf = woc.orig.DeepCopy()
woc.markWorkflowError(ctx, err)
- oldRV := woc.wf.ResourceVersion
- _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
+ oldRV := woc.wf.ResourceVersion
+ woc.updateLastSeenVersionAnnotation(oldRV)
+ _, err = wfClient.Update(ctx, woc.wf, metav1.UpdateOptions{})
if err != nil {
woc.log.WithError(err).Warn(ctx, "Error updating workflow with size error")
} else {
woc.updateLastSeenVersion(oldRV)
}
}This keeps the annotation and lastSeenVersions map coherent across both success paths.
Also applies to: 789-789, 865-871
🤖 Prompt for AI Agents
In workflow/controller/operator.go around lines 764-766 (and similarly at 789
and 865-871), the size-limit error path updates only the in-memory
lastSeenVersions map but does not set the AnnotationKeyLastSeenVersion on the
workflow before persisting, causing the annotation and map to diverge; fix by
capturing oldRV := woc.wf.ResourceVersion, call
woc.updateLastSeenVersionAnnotation(oldRV) on the workflow object before calling
wfClient.Update(...), then on successful Update call
woc.updateLastSeenVersion(oldRV) to keep the persisted annotation and in-memory
map in sync (apply same change to the other mentioned blocks).
Duplicate of argoproj#15090 for CodeRabbit review.
Motivation
Multiple issues have been created because of unexpected workflow behavior:
argoproj#13986
argoproj#14833
argoproj#12352
argoproj#14780
It appears that many of these issues occur because the controller is processing an outdated version of the workflow. The exact cause of these stale reads is still unknown, but there is some suspicion that it may be related to the informer write-back mechanism, which is being disabled by default in argoproj#15079.
This PR ensures that stale workflow versions are not reconciled by keeping track of the last processed resource version for each workflow in a last-seen-version annotation. A workflow is only processed when its annotation matches the expected version; otherwise, it is re-queued. The annotation stores the workflow’s resource version, though any unique value would work. I just thought using the RV was enough.
Modifications
last-seen-versionannotation, updated with the current resource version on everyUpdate()event.Deleteevent is received or when the workflow completes.Verification
Executed workflows with success.
Documentation
Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.