Skip to content

Commit

Permalink
fix: sync cluster Workflow Template Informer before it's used (#8961)
Browse files Browse the repository at this point in the history
* fix: After Controller leader election we need to make sure clusterWorkflowTemplateInformer's cache is updated first. Otherwise, we call Lister() on it before it's ready.

Signed-off-by: Julie Vogelman <[email protected]>

* fix: empty commit

Signed-off-by: Julie Vogelman <[email protected]>

* fix: empty commit

Signed-off-by: Julie Vogelman <[email protected]>

* fix: fix condition for exiting function

Signed-off-by: Julie Vogelman <[email protected]>

* feat: for efficiency move the call to create cluster workflow template informer before we sync

Signed-off-by: Julie Vogelman <[email protected]>

* feat: empty commit

Signed-off-by: Julie Vogelman <[email protected]>

* feat: empty commit

Signed-off-by: Julie Vogelman <[email protected]>

* fix: additional log line for error just in case

Signed-off-by: Julie Vogelman <[email protected]>

* fix: additional error handling

Signed-off-by: Julie Vogelman <[email protected]>

* fix: revert previous change - nil is valid

Signed-off-by: Julie Vogelman <[email protected]>

* fix: for safety add verification that pointer isn't nil before use

Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 authored and sarabala1979 committed Jun 20, 2022
1 parent 4d9f8f7 commit 1d26628
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
11 changes: 9 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.configMapInformer.Run(ctx.Done())
go wfc.wfTaskSetInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Run(ctx.Done())
wfc.createClusterWorkflowTemplateInformer(ctx)

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(
Expand All @@ -273,8 +274,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
log.Fatal("Timed out waiting for caches to sync")
}

wfc.createClusterWorkflowTemplateInformer(ctx)

// Start the metrics server
go wfc.metrics.RunServer(ctx)

Expand Down Expand Up @@ -416,6 +415,14 @@ func (wfc *WorkflowController) createClusterWorkflowTemplateInformer(ctx context
if cwftGetAllowed && cwftListAllowed && cwftWatchAllowed {
wfc.cwftmplInformer = informer.NewTolerantClusterWorkflowTemplateInformer(wfc.dynamicInterface, clusterWorkflowTemplateResyncPeriod)
go wfc.cwftmplInformer.Informer().Run(ctx.Done())

// since the above call is asynchronous, make sure we populate our cache before we try to use it later
if !cache.WaitForCacheSync(
ctx.Done(),
wfc.cwftmplInformer.Informer().HasSynced,
) {
log.Fatal("Timed out waiting for ClusterWorkflowTemplate cache to sync")
}
} else {
log.Warnf("Controller doesn't have RBAC access for ClusterWorkflowTemplates")
}
Expand Down
18 changes: 12 additions & 6 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,11 @@ func printPodSpecLog(pod *apiv1.Pod, wfName string) {
// and returns the new node status if something changed
func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus) *wfv1.NodeStatus {
new := old.DeepCopy()
tmpl := woc.GetNodeTemplate(old)
tmpl, err := woc.GetNodeTemplate(old)
if err != nil {
woc.log.Error(err)
return nil
}
switch pod.Status.Phase {
case apiv1.PodPending:
new.Phase = wfv1.NodePending
Expand All @@ -1170,7 +1174,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.Daemoned = nil
case apiv1.PodFailed:
// ignore pod failure for daemoned steps
if tmpl.IsDaemon() {
if tmpl != nil && tmpl.IsDaemon() {
new.Phase = wfv1.NodeSucceeded
} else {
new.Phase, new.Message = woc.inferFailedReason(pod, tmpl)
Expand All @@ -1180,7 +1184,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
// Daemons are a special case we need to understand the rules:
// A node transitions into "daemoned" only if it's a daemon template and it becomes running AND ready.
// A node will be unmarked "daemoned" when its boundary node completes, anywhere killDaemonedChildren is called.
if tmpl.IsDaemon() {
if tmpl != nil && tmpl.IsDaemon() {
if !old.Fulfilled() {
// pod is running and template is marked daemon. check if everything is ready
for _, ctrStatus := range pod.Status.ContainerStatuses {
Expand Down Expand Up @@ -2084,19 +2088,21 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool {
return false
}

func (woc *wfOperationCtx) GetNodeTemplate(node *wfv1.NodeStatus) *wfv1.Template {
func (woc *wfOperationCtx) GetNodeTemplate(node *wfv1.NodeStatus) (*wfv1.Template, error) {
if node.TemplateRef != nil {
tmplCtx, err := woc.createTemplateContext(node.GetTemplateScope())
if err != nil {
woc.markNodeError(node.Name, err)
return nil, err
}
tmpl, err := tmplCtx.GetTemplateFromRef(node.TemplateRef)
if err != nil {
woc.markNodeError(node.Name, err)
return tmpl, err
}
return tmpl
return tmpl, nil
}
return woc.wf.GetTemplateByName(node.TemplateName)
return woc.wf.GetTemplateByName(node.TemplateName), nil
}

func (woc *wfOperationCtx) markWorkflowRunning(ctx context.Context) {
Expand Down

0 comments on commit 1d26628

Please sign in to comment.