Skip to content

Commit

Permalink
fix: Ensure HTTP reconciliation occurs for onExit nodes (#7084)
Browse files Browse the repository at this point in the history
* fix: Ensure HTTP reconciliation occurs for onExit nodes

Signed-off-by: Simon Behar <[email protected]>

* tests

Signed-off-by: Simon Behar <[email protected]>

* fixes

Signed-off-by: Simon Behar <[email protected]>

* minor

Signed-off-by: Simon Behar <[email protected]>
  • Loading branch information
simster7 authored Oct 27, 2021
1 parent d6a62c3 commit 64fce4a
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 25 deletions.
2 changes: 0 additions & 2 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,9 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
podName := woc.getAgentPodName()

obj, exists, err := woc.controller.podInformer.GetStore().Get(cache.ExplicitKey(woc.wf.Namespace + "/" + podName))

if err != nil {
return nil, fmt.Errorf("failed to get pod from informer store: %w", err)
}

if exists {
existing, ok := obj.(*apiv1.Pod)
if ok {
Expand Down
61 changes: 61 additions & 0 deletions workflow/controller/exit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)
Expand Down Expand Up @@ -687,3 +688,63 @@ func TestDagOnExitAndRetryStrategy(t *testing.T) {

assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

var testWorkflowOnExitHttpReconciliation = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world-sx6lw
spec:
entrypoint: whalesay
onExit: exit-handler
templates:
- container:
args:
- hello world
command:
- cowsay
image: docker/whalesay:latest
name: whalesay
- http:
url: https://example.com
name: exit-handler
status:
nodes:
hello-world-sx6lw:
displayName: hello-world-sx6lw
finishedAt: "2021-10-27T14:38:30Z"
hostNodeName: k3d-k3s-default-server-0
id: hello-world-sx6lw
name: hello-world-sx6lw
phase: Succeeded
progress: 1/1
resourcesDuration:
cpu: 2
memory: 1
startedAt: "2021-10-27T14:38:27Z"
templateName: whalesay
templateScope: local/hello-world-sx6lw
type: Pod
phase: Running
startedAt: "2021-10-27T14:38:27Z"
`

func TestWorkflowOnExitHttpReconciliation(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(testWorkflowOnExitHttpReconciliation)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)

taskSets, err := woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets("").List(ctx, v1.ListOptions{})
if assert.NoError(t, err) {
assert.Len(t, taskSets.Items, 0)
}
woc.operate(ctx)

assert.Len(t, woc.wf.Status.Nodes, 2)
taskSets, err = woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTaskSets("").List(ctx, v1.ListOptions{})
if assert.NoError(t, err) {
assert.Len(t, taskSets.Items, 1)
}
}
36 changes: 36 additions & 0 deletions workflow/controller/http_template.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"context"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

Expand All @@ -12,3 +14,37 @@ func (woc *wfOperationCtx) executeHTTPTemplate(nodeName string, templateScope st
}
return node
}

func (woc *wfOperationCtx) httpReconciliation(ctx context.Context) {
err := woc.reconcileTaskSet(ctx)
if err != nil {
woc.log.WithError(err).Error("error in workflowtaskset reconciliation")
return
}

err = woc.reconcileAgentPod(ctx)
if err != nil {
woc.log.WithError(err).Error("error in agent pod reconciliation")
woc.markWorkflowError(ctx, err)
return
}
}

func (woc *wfOperationCtx) nodeRequiresHttpReconciliation(nodeName string) bool {
node := woc.wf.GetNodeByName(nodeName)
if node == nil {
return false
}
// If this node is of type HTTP, it will need an HTTP reconciliation
if node.Type == wfv1.NodeTypeHTTP {
return true
}
for _, child := range node.Children {
// If any of the node's children need an HTTP reconciliation, the parent node will also need one
if woc.nodeRequiresHttpReconciliation(child) {
return true
}
}
// If neither of the children need one -- or if there are no children -- no HTTP reconciliation is needed.
return false
}
41 changes: 41 additions & 0 deletions workflow/controller/http_template_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package controller

import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func TestNodeRequiresHttpReconciliation(t *testing.T) {
woc := &wfOperationCtx{
wf: &v1alpha1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: "test-wf",
},
Status: v1alpha1.WorkflowStatus{
Nodes: v1alpha1.Nodes{
"test-wf-1996333140": v1alpha1.NodeStatus{
Name: "not-needed",
Type: v1alpha1.NodeTypePod,
},
"test-wf-3939368189": v1alpha1.NodeStatus{
Name: "parent",
Type: v1alpha1.NodeTypeSteps,
Children: []string{"child-http"},
},
"test-wf-1430055856": v1alpha1.NodeStatus{
Name: "child-http",
Type: v1alpha1.NodeTypeHTTP,
},
},
},
},
}

assert.False(t, woc.nodeRequiresHttpReconciliation("not-needed"))
assert.True(t, woc.nodeRequiresHttpReconciliation("child-http"))
assert.True(t, woc.nodeRequiresHttpReconciliation("parent"))
}
20 changes: 8 additions & 12 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
return
}

err = woc.taskSetReconciliation(ctx)
if err != nil {
woc.log.WithError(err).Error("error in workflowtaskset reconciliation")
return
}

err = woc.reconcileAgentPod(ctx)
if err != nil {
woc.log.WithError(err).Error("error in agent pod reconciliation")
woc.markWorkflowError(ctx, err)
return
}
// Reconcile TaskSet and Agent for HTTP templates
woc.httpReconciliation(ctx)

if node == nil || !node.Fulfilled() {
// node can be nil if a workflow created immediately in a parallelism == 0 state
Expand Down Expand Up @@ -423,6 +413,12 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
return
}

// If the onExit node (or any child of the onExit node) requires HTTP reconciliation, do it here
if onExitNode != nil && woc.nodeRequiresHttpReconciliation(onExitNode.Name) {
woc.httpReconciliation(ctx)
}

if onExitNode == nil || !onExitNode.Fulfilled() {
return
}
Expand Down
19 changes: 10 additions & 9 deletions workflow/controller/taskset.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,33 @@ func (woc *wfOperationCtx) completeTaskSet(ctx context.Context) error {
}

func (woc *wfOperationCtx) getWorkflowTaskSet() (*wfv1.WorkflowTaskSet, error) {
taskSet, exist, err := woc.controller.wfTaskSetInformer.Informer().GetIndexer().GetByKey(woc.wf.Namespace + "/" + woc.wf.Name)
taskSet, exists, err := woc.controller.wfTaskSetInformer.Informer().GetIndexer().GetByKey(woc.wf.Namespace + "/" + woc.wf.Name)
if err != nil {
return nil, err
}
if !exist {
if !exists {
return nil, nil
}

return taskSet.(*wfv1.WorkflowTaskSet), nil
}

func (woc *wfOperationCtx) taskSetReconciliation(ctx context.Context) error {
workflowTaskset, err := woc.getWorkflowTaskSet()
func (woc *wfOperationCtx) reconcileTaskSet(ctx context.Context) error {
workflowTaskSet, err := woc.getWorkflowTaskSet()
if err != nil {
return err
}

woc.log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Infof("TaskSet Reconciliation")
if workflowTaskset != nil && len(workflowTaskset.Status.Nodes) > 0 {
for nodeID, taskResult := range workflowTaskset.Status.Nodes {
if workflowTaskSet != nil && len(workflowTaskSet.Status.Nodes) > 0 {
for nodeID, taskResult := range workflowTaskSet.Status.Nodes {
node := woc.wf.Status.Nodes[nodeID]

node.Outputs = taskResult.Outputs.DeepCopy()
node.Phase = taskResult.Phase
node.Message = taskResult.Message
woc.wf.Status.Nodes[nodeID] = node
node.FinishedAt = metav1.Now()

woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
}
}
Expand All @@ -105,6 +106,7 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error {
if len(woc.taskSet) == 0 {
return nil
}

key := fmt.Sprintf("%s/%s", woc.wf.Namespace, woc.wf.Name)
log.WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).WithField("TaskSet", key).Infof("Creating TaskSet")
taskSet := wfv1.WorkflowTaskSet{
Expand Down Expand Up @@ -143,7 +145,6 @@ func (woc *wfOperationCtx) createTaskSet(ctx context.Context) error {
log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to patch WorkflowTaskSet")
return fmt.Errorf("failed to patch TaskSet. %v", err)
}

} else if err != nil {
log.WithError(err).WithField("workflow", woc.wf.Name).WithField("namespace", woc.wf.Namespace).Error("Failed to create WorkflowTaskSet")
return err
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/taskset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func TestNonHTTPTemplateScenario(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(helloWorldWf)
woc := newWorkflowOperationCtx(wf, controller)
ctx := context.Background()
t.Run("taskSetReconciliation", func(t *testing.T) {
t.Run("reconcileTaskSet", func(t *testing.T) {
woc.operate(ctx)
err := woc.taskSetReconciliation(ctx)
err := woc.reconcileTaskSet(ctx)
assert.NoError(t, err)
})
t.Run("completeTaskSet", func(t *testing.T) {
Expand Down

0 comments on commit 64fce4a

Please sign in to comment.