diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java index fde1899421..905424b0c2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java @@ -195,9 +195,17 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) } for (TaskModel forkedTask : forkedTasks) { - Map forkedTaskInput = - tasksInput.get(forkedTask.getReferenceTaskName()); - forkedTask.addInput(forkedTaskInput); + try { + Map forkedTaskInput = + tasksInput.get(forkedTask.getReferenceTaskName()); + forkedTask.addInput(forkedTaskInput); + } catch (Exception e) { + String reason = + String.format( + "Tasks could not be dynamically forked due to invalid input: %s", + e.getMessage()); + throw new TerminateWorkflowException(reason); + } } mappedTasks.addAll(forkedTasks); // Get the last of the dynamic tasks so that the join can be performed once this task is diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy index a7db27e93c..c23a73d39e 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/DynamicForkJoinSpec.groovy @@ -18,8 +18,10 @@ import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef import com.netflix.conductor.common.metadata.tasks.TaskType import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams +import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.common.metadata.workflow.WorkflowTask import com.netflix.conductor.common.run.Workflow +import com.netflix.conductor.core.execution.StartWorkflowInput import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.dao.QueueDAO import com.netflix.conductor.test.base.AbstractSpecification @@ -777,4 +779,82 @@ class DynamicForkJoinSpec extends AbstractSpecification { tasks[5].status == Task.Status.COMPLETED } } + + def "Test dynamic fork join fail when task input is invalid"() { + when: "a dynamic fork join workflow is started" + def workflowInstanceId = startWorkflow(DYNAMIC_FORK_JOIN_WF, 1, + 'dynamic_fork_join_workflow', [:], + null) + + then: "verify that the workflow has been successfully started and the first task is in scheduled state" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.SCHEDULED + } + + when: " the first task is 'integration_task_1' output has a list of dynamic tasks" + WorkflowTask workflowTask2 = new WorkflowTask() + workflowTask2.name = 'integration_task_2' + workflowTask2.taskReferenceName = 'xdt1' + + WorkflowTask workflowTask3 = new WorkflowTask() + workflowTask3.name = 'integration_task_3' + workflowTask3.taskReferenceName = 'xdt2' + + def invalidDynamicTasksInput = ['xdt1': 'v1', 'xdt2': 'v2'] + + and: "The 'integration_task_1' is polled and completed" + def pollAndCompleteTask1Try = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.worker', + ['dynamicTasks': [workflowTask2, workflowTask3], 'dynamicTasksInput': invalidDynamicTasksInput]) + + then: "verify that the task was completed" + workflowTestUtil.verifyPolledAndAcknowledgedTask(pollAndCompleteTask1Try) + + and: "verify that workflow failed" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.size() == 1 + tasks[0].taskType == 'integration_task_1' + tasks[0].status == Task.Status.COMPLETED + } + } + + def "Test dynamic fork join return failed workflow when start with invalid input"() { + when: "a dynamic fork join workflow is started" + WorkflowTask workflowTask2 = new WorkflowTask() + workflowTask2.name = 'integration_task_2' + workflowTask2.taskReferenceName = 'xdt1' + + WorkflowTask workflowTask3 = new WorkflowTask() + workflowTask3.name = 'integration_task_3' + workflowTask3.taskReferenceName = 'xdt2' + + def invalidDynamicTasksInput = ['xdt1': 'v1', 'xdt2': 'v2'] + def workflowInput = ['dynamicTasks': [workflowTask2, workflowTask3], 'dynamicTasksInput': invalidDynamicTasksInput] + + def dynamicForkJoinTask = new WorkflowTask() + dynamicForkJoinTask.name = 'dynamicfanouttask' + dynamicForkJoinTask.taskReferenceName = 'dynamicfanouttask' + dynamicForkJoinTask.type = 'FORK_JOIN_DYNAMIC' + dynamicForkJoinTask.inputParameters = ['dynamicTasks': '${workflow.input.dynamicTasks}', 'dynamicTasksInput': '${workflow.input.dynamicTasksInput}'] + dynamicForkJoinTask.dynamicForkTasksParam = 'dynamicTasks' + dynamicForkJoinTask.dynamicForkTasksInputParamName = 'dynamicTasksInput' + + def workflowDef = new WorkflowDef() + workflowDef.name = 'DynamicForkJoinStartTest' + workflowDef.version = 1 + workflowDef.tasks.add(dynamicForkJoinTask) + workflowDef.ownerEmail = 'test@harness.com' + + def startWorkflowInput = new StartWorkflowInput(name: workflowDef.name, version: workflowDef.version, workflowInput: workflowInput, workflowDefinition: workflowDef) + def workflowInstanceId = workflowExecutor.startWorkflow(startWorkflowInput) + + then: "verify that workflow failed" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.FAILED + tasks.isEmpty() + } + } }