Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Fail workflow if input to dynamic fork join task is invalid
Browse files Browse the repository at this point in the history
  • Loading branch information
jxu-nflx committed Oct 6, 2022
1 parent e4b5e3c commit 3ef90f9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,17 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
}

for (TaskModel forkedTask : forkedTasks) {
Map<String, Object> forkedTaskInput =
tasksInput.get(forkedTask.getReferenceTaskName());
forkedTask.addInput(forkedTaskInput);
try {
Map<String, Object> forkedTaskInput =
tasksInput.get(forkedTask.getReferenceTaskName());
forkedTask.addInput(forkedTaskInput);
} catch (Exception e) {
String reason =
String.format(
"Tasks could not be dynamically forked due to invalid input: %s",
e.getMessage());
throw new TerminateWorkflowException(reason);
}
}
mappedTasks.addAll(forkedTasks);
// Get the last of the dynamic tasks so that the join can be performed once this task is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.tasks.TaskType
import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams
import com.netflix.conductor.common.metadata.workflow.WorkflowDef
import com.netflix.conductor.common.metadata.workflow.WorkflowTask
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.execution.StartWorkflowInput
import com.netflix.conductor.core.execution.tasks.SubWorkflow
import com.netflix.conductor.dao.QueueDAO
import com.netflix.conductor.test.base.AbstractSpecification
Expand Down Expand Up @@ -777,4 +779,82 @@ class DynamicForkJoinSpec extends AbstractSpecification {
tasks[5].status == Task.Status.COMPLETED
}
}

def "Test dynamic fork join fail when task input is invalid"() {
when: "a dynamic fork join workflow is started"
def workflowInstanceId = startWorkflow(DYNAMIC_FORK_JOIN_WF, 1,
'dynamic_fork_join_workflow', [:],
null)

then: "verify that the workflow has been successfully started and the first task is in scheduled state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
}

when: " the first task is 'integration_task_1' output has a list of dynamic tasks"
WorkflowTask workflowTask2 = new WorkflowTask()
workflowTask2.name = 'integration_task_2'
workflowTask2.taskReferenceName = 'xdt1'

WorkflowTask workflowTask3 = new WorkflowTask()
workflowTask3.name = 'integration_task_3'
workflowTask3.taskReferenceName = 'xdt2'

def invalidDynamicTasksInput = ['xdt1': 'v1', 'xdt2': 'v2']

and: "The 'integration_task_1' is polled and completed"
def pollAndCompleteTask1Try = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.worker',
['dynamicTasks': [workflowTask2, workflowTask3], 'dynamicTasksInput': invalidDynamicTasksInput])

then: "verify that the task was completed"
workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask1Try)

and: "verify that workflow failed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
}
}

def "Test dynamic fork join return failed workflow when start with invalid input"() {
when: "a dynamic fork join workflow is started"
WorkflowTask workflowTask2 = new WorkflowTask()
workflowTask2.name = 'integration_task_2'
workflowTask2.taskReferenceName = 'xdt1'

WorkflowTask workflowTask3 = new WorkflowTask()
workflowTask3.name = 'integration_task_3'
workflowTask3.taskReferenceName = 'xdt2'

def invalidDynamicTasksInput = ['xdt1': 'v1', 'xdt2': 'v2']
def workflowInput = ['dynamicTasks': [workflowTask2, workflowTask3], 'dynamicTasksInput': invalidDynamicTasksInput]

def dynamicForkJoinTask = new WorkflowTask()
dynamicForkJoinTask.name = 'dynamicfanouttask'
dynamicForkJoinTask.taskReferenceName = 'dynamicfanouttask'
dynamicForkJoinTask.type = 'FORK_JOIN_DYNAMIC'
dynamicForkJoinTask.inputParameters = ['dynamicTasks': '${workflow.input.dynamicTasks}', 'dynamicTasksInput': '${workflow.input.dynamicTasksInput}']
dynamicForkJoinTask.dynamicForkTasksParam = 'dynamicTasks'
dynamicForkJoinTask.dynamicForkTasksInputParamName = 'dynamicTasksInput'

def workflowDef = new WorkflowDef()
workflowDef.name = 'DynamicForkJoinStartTest'
workflowDef.version = 1
workflowDef.tasks.add(dynamicForkJoinTask)
workflowDef.ownerEmail = '[email protected]'

def startWorkflowInput = new StartWorkflowInput(name: workflowDef.name, version: workflowDef.version, workflowInput: workflowInput, workflowDefinition: workflowDef)
def workflowInstanceId = workflowExecutor.startWorkflow(startWorkflowInput)

then: "verify that workflow failed"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.FAILED
tasks.isEmpty()
}
}
}

0 comments on commit 3ef90f9

Please sign in to comment.