From 04d19435cb07e8815f1f95cca6751f8ce6b4bec1 Mon Sep 17 00:00:00 2001 From: Yuan Tang Date: Thu, 25 Aug 2022 11:03:55 -0400 Subject: [PATCH] fix: Properly reset suspended and skipped nodes when retrying (#9422) * chore: wip Signed-off-by: Yuan Tang * chore: wip Signed-off-by: Yuan Tang * chore: clean up Signed-off-by: Yuan Tang * chore: clean up Signed-off-by: Yuan Tang * chore: Add test Signed-off-by: Yuan Tang * chore: fix Signed-off-by: Yuan Tang Signed-off-by: Yuan Tang --- workflow/util/util.go | 19 ++++++++++++++++++- workflow/util/util_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/workflow/util/util.go b/workflow/util/util.go index 664dc0750f90..a60061559e07 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -919,7 +919,24 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce } func resetNode(node wfv1.NodeStatus) wfv1.NodeStatus { - node.Phase = wfv1.NodeRunning + // The previously supplied parameters needed to be reset. Otherwise, `argo node reset` would not work as expected. + if node.Type == wfv1.NodeTypeSuspend { + if node.Outputs != nil { + for i, param := range node.Outputs.Parameters { + node.Outputs.Parameters[i] = wfv1.Parameter{ + Name: param.Name, + Value: nil, + ValueFrom: &wfv1.ValueFrom{Supplied: &wfv1.SuppliedValueFrom{}}, + } + } + } + } + if node.Phase == wfv1.NodeSkipped { + // The skipped nodes need to be kept as skipped. Otherwise, the workflow will be stuck on running. + node.Phase = wfv1.NodeSkipped + } else { + node.Phase = wfv1.NodeRunning + } node.Message = "" node.StartedAt = metav1.Time{Time: time.Now().UTC()} node.FinishedAt = metav1.Time{} diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index c55f920f53dc..e9a069378f75 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -901,6 +901,40 @@ func TestFormulateRetryWorkflow(t *testing.T) { } }) + t.Run("Skipped and Suspended Nodes", func(t *testing.T) { + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "wf-with-skipped-and-suspended-nodes", + Labels: map[string]string{}, + }, + Status: wfv1.WorkflowStatus{ + Phase: wfv1.WorkflowFailed, + Nodes: map[string]wfv1.NodeStatus{ + "my-nested-dag-1": {ID: "my-nested-dag-1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeTaskGroup}, + "suspended": {ID: "suspended", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypeSuspend, BoundaryID: "my-nested-dag-1", Outputs: &wfv1.Outputs{Parameters: []wfv1.Parameter{{ + Name: "param-1", + Value: wfv1.AnyStringPtr("3"), + ValueFrom: &wfv1.ValueFrom{Supplied: &wfv1.SuppliedValueFrom{}}, + }}}}, + "skipped": {ID: "skipped", Phase: wfv1.NodeSkipped, Type: wfv1.NodeTypeSkipped, BoundaryID: "suspended"}, + }}, + } + _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(t, err) + wf, _, err = FormulateRetryWorkflow(ctx, wf, true, "id=suspended", nil) + if assert.NoError(t, err) { + if assert.Len(t, wf.Status.Nodes, 3) { + assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["my-nested-dag-1"].Phase) + assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["suspended"].Phase) + assert.Equal(t, wfv1.Parameter{ + Name: "param-1", + Value: nil, + ValueFrom: &wfv1.ValueFrom{Supplied: &wfv1.SuppliedValueFrom{}}, + }, wf.Status.Nodes["suspended"].Outputs.Parameters[0]) + assert.Equal(t, wfv1.NodeSkipped, wf.Status.Nodes["skipped"].Phase) + } + } + }) t.Run("Nested DAG with Non-group Node Selected", func(t *testing.T) { wf := &wfv1.Workflow{ ObjectMeta: metav1.ObjectMeta{