-
Notifications
You must be signed in to change notification settings - Fork 711
Description
There seems to be a discrepancy in how Conductor OSS handles task inputs compared to Orkes Cloud, specifically in workflows that use FORK_JOIN_DYNAMIC
together with JSON_JQ_TRANSFORM
or INLINE
tasks.
Expected Behavior (Orkes Cloud)
When providing forkTaskInputs
in the following form:
[
{ "param1": "value", "param2": "value" },
{ "param1": "value", "param2": "value" }
]
each sub-workflow receives its input
as-is:
{
"param1": "value",
"param2": "value"
}
Actual Behavior (Conductor OSS)
In Conductor OSS, when forkTaskInputs
comes directly from the output of a JSON_JQ_TRANSFORM
or INLINE
task, the input to each sub-workflow is wrapped in an additional input
field:
{
"input": {
"param1": "value",
"param2": "value"
}
}
Additional Findings
This behavior only happens when the output of JSON_JQ_TRANSFORM
or INLINE
is used directly by FORK_JOIN_DYNAMIC
. If any asynchronous task (such as WAIT
, HUMAN
, SIMPLE
, etc.) is placed in between, the behavior changes: the wrapping disappears, and the parameters are passed correctly.
This suggests that the issue occurs when Conductor OSS passes the raw output of a JSON_JQ_TRANSFORM
(or INLINE
) task directly into FORK_JOIN_DYNAMIC
. In this case, each element gets wrapped with an additional input
field. However, when an asynchronous task is placed in between, the workflow engine serializes and deserializes the intermediate state, which corrects the structure and removes the unwanted nesting.
Minimal workflow definition to reproduce:
{
"name": "fork_join_dynamic_bug",
"description": "A minimal reproduction WF for Conductor OSS FORK_JOIN_DYNAMIC bug",
"version": 1,
"tasks": [
{
"name": "json_transform",
"taskReferenceName": "json_transform",
"inputParameters": {
"forkedProperty": [
"value1",
"value2"
],
"queryExpression": ".forkedProperty[] as $property | {property: $property, commonProperty: .commonProperty}",
"commonProperty": "common value"
},
"type": "JSON_JQ_TRANSFORM"
},
{
"name": "fork_join_dynamic",
"taskReferenceName": "fork_join_dynamic",
"inputParameters": {
"forkTaskWorkflow": "sub_workflow_definition_name",
"forkTaskWorkflowVersion": "1",
"forkTaskInputs": "${json_transform.output.resultList}"
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "join",
"taskReferenceName": "join",
"inputParameters": {},
"type": "JOIN",
}
],
"inputParameters": [],
"outputParameters": {},
"schemaVersion": 2,
"ownerEmail": "[email protected]"
}