Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@
)
from ._executors_basic import (
BASIC_ACTION_EXECUTORS,
AppendValueExecutor,
ClearAllVariablesExecutor,
CreateConversationExecutor,
EmitEventExecutor,
ResetVariableExecutor,
SendActivityExecutor,
SetMultipleVariablesExecutor,
Expand All @@ -61,12 +59,10 @@
)
from ._executors_external_input import (
EXTERNAL_INPUT_EXECUTORS,
ConfirmationExecutor,
ExternalInputRequest,
ExternalInputResponse,
QuestionExecutor,
RequestExternalInputExecutor,
WaitForInputExecutor,
)
from ._executors_http import (
HTTP_ACTION_EXECUTORS,
Expand Down Expand Up @@ -122,11 +118,9 @@
"AgentExternalInputRequest",
"AgentExternalInputResponse",
"AgentResult",
"AppendValueExecutor",
"BaseToolExecutor",
"BreakLoopExecutor",
"ClearAllVariablesExecutor",
"ConfirmationExecutor",
"ContinueLoopExecutor",
"ConversationData",
"CreateConversationExecutor",
Expand All @@ -139,7 +133,6 @@
"DeclarativeWorkflowState",
"DefaultHttpRequestHandler",
"DefaultMCPToolHandler",
"EmitEventExecutor",
"EndConversationExecutor",
"EndWorkflowExecutor",
"ExternalInputRequest",
Expand Down Expand Up @@ -173,7 +166,6 @@
"ToolApprovalResponse",
"ToolApprovalState",
"ToolInvocationResult",
"WaitForInputExecutor",
"WorkflowFactory",
"WorkflowState",
]
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
# Action kinds that terminate control flow (no fall-through to successor)
# These actions transfer control elsewhere and should not have sequential edges to the next action
TERMINATOR_ACTIONS = frozenset({
"Goto",
"GotoAction",
"BreakLoop",
"ContinueLoop",
Expand All @@ -80,18 +79,14 @@
ACTION_REQUIRED_FIELDS: dict[str, list[str]] = {
"SetValue": ["path"],
"SetVariable": ["variable"],
"AppendValue": ["path", "value"],
"SendActivity": ["activity"],
"InvokeAzureAgent": ["agent"],
"Goto": ["target"],
"GotoAction": ["actionId"],
"Foreach": ["items", "actions"],
"If": ["condition"],
"Switch": ["value"], # Switch can use value/cases or conditions (ConditionGroup style)
"ConditionGroup": ["conditions"],
"RequestHumanInput": ["variable"],
"WaitForHumanInput": ["variable"],
"EmitEvent": ["event"],
"InvokeFunctionTool": ["functionName"],
"HttpRequestAction": ["url"],
"InvokeMcpTool": ["serverUrl", "toolName"],
Expand All @@ -101,11 +96,12 @@
# Key: "ActionKind.field", Value: list of alternates that satisfy the requirement
ACTION_ALTERNATE_FIELDS: dict[str, list[str]] = {
"SetValue.path": ["variable"],
"Goto.target": ["actionId"],
"GotoAction.actionId": ["target"],
"InvokeAzureAgent.agent": ["agentName"],
"Foreach.items": ["itemsSource", "source"], # source is used in some schemas
"Switch.value": ["conditions"], # Switch can be condition-based instead of value-based
# ConditionGroup accepts either the canonical "conditions" form or the
# legacy "value"/"cases" form (Category 2 schema variant - deferred).
"ConditionGroup.conditions": ["value", "cases"],
}
Comment thread
peibekwe marked this conversation as resolved.


Expand All @@ -115,9 +111,9 @@ class DeclarativeWorkflowBuilder:
This builder transforms declarative action definitions into a proper
workflow graph with executor nodes and edges. It handles:
- Sequential actions (simple edges)
- Conditional branching (If/Switch with condition edges)
- Conditional branching (If/ConditionGroup with condition edges)
- Loops (Foreach with loop edges)
- Jumps (Goto with target edges)
- Jumps (GotoAction with target edges)

Example usage:
yaml_def = {
Expand Down Expand Up @@ -299,7 +295,7 @@ def _validate_actions_recursive(
raise ValueError(f"Action '{kind}' is missing required field '{field}'. Action: {action_def}")

# Collect goto targets for circular reference detection
if kind in ("Goto", "GotoAction"):
if kind == "GotoAction":
target = action_def.get("target") or action_def.get("actionId")
if target:
goto_targets.append((target, explicit_id))
Expand All @@ -313,7 +309,7 @@ def _validate_actions_recursive(
if else_actions:
self._validate_actions_recursive(else_actions, seen_ids, goto_targets, defined_ids)

elif kind in ("Switch", "ConditionGroup"):
elif kind == "ConditionGroup":
cases = action_def.get("cases", action_def.get("conditions", []))
for case in cases:
case_actions = case.get("actions", [])
Expand Down Expand Up @@ -362,7 +358,7 @@ def _validate_no_circular_gotos(
# Check for direct self-reference
if source_id and target_id == source_id:
raise ValueError(
f"Action '{source_id}' has a direct self-referencing Goto, which would cause an infinite loop."
f"Action '{source_id}' has a direct self-referencing GotoAction, which would cause an infinite loop."
)

def _resolve_pending_gotos(self, builder: WorkflowBuilder) -> None:
Expand All @@ -380,7 +376,7 @@ def _resolve_pending_gotos(self, builder: WorkflowBuilder) -> None:
builder.add_edge(source=goto_executor, target=target_executor)
else:
available_ids = list(self._executors.keys())
raise ValueError(f"Goto target '{target_id}' not found. Available action IDs: {available_ids}")
raise ValueError(f"GotoAction target '{target_id}' not found. Available action IDs: {available_ids}")

def _create_executors_for_actions(
self,
Expand Down Expand Up @@ -453,11 +449,11 @@ def _create_executor_for_action(
# Handle special control flow actions
if kind == "If":
return self._create_if_structure(action_def, builder, parent_context)
if kind == "Switch" or kind == "ConditionGroup":
if kind == "ConditionGroup":
return self._create_switch_structure(action_def, builder, parent_context)
if kind == "Foreach":
return self._create_foreach_structure(action_def, builder, parent_context)
if kind == "Goto" or kind == "GotoAction":
if kind == "GotoAction":
return self._create_goto_reference(action_def, builder, parent_context)
if kind == "BreakLoop":
return self._create_break_executor(action_def, builder, parent_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,28 +179,6 @@ async def handle_action(
await ctx.send_message(ActionComplete())


class AppendValueExecutor(DeclarativeActionExecutor):
"""Executor for the AppendValue action."""

@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Handle the AppendValue action."""
state = await self._ensure_state_initialized(ctx, trigger)

path = self._action_def.get("path")
value = self._action_def.get("value")

if path:
evaluated_value = state.eval_if_expression(value)
state.append(path, evaluated_value)

await ctx.send_message(ActionComplete())


class ResetVariableExecutor(DeclarativeActionExecutor):
"""Executor for the ResetVariable action."""

Expand Down Expand Up @@ -279,47 +257,6 @@ async def handle_action(
await ctx.send_message(ActionComplete())


class EmitEventExecutor(DeclarativeActionExecutor):
"""Executor for the EmitEvent action.

Emits a custom event to the workflow event stream.

Supports two schema formats:
1. Graph mode: eventName, eventValue
2. Interpreter mode: event.name, event.data
"""

@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete, dict[str, Any]],
) -> None:
"""Handle the EmitEvent action."""
state = await self._ensure_state_initialized(ctx, trigger)

# Support both schema formats:
# - Graph mode: eventName, eventValue
# - Interpreter mode: event.name, event.data
event_def = self._action_def.get("event", {})
event_name = self._action_def.get("eventName") or event_def.get("name", "")
event_value = self._action_def.get("eventValue")
if event_value is None:
event_value = event_def.get("data")

if event_name:
evaluated_name = state.eval_if_expression(event_name)
evaluated_value = state.eval_if_expression(event_value)

event_data = {
"eventName": evaluated_name,
"eventValue": evaluated_value,
}
await ctx.yield_output(event_data)

await ctx.send_message(ActionComplete())


class EditTableExecutor(DeclarativeActionExecutor):
"""Executor for the EditTable action.

Expand Down Expand Up @@ -628,11 +565,9 @@ def _convert_to_type(self, value: Any, target_type: str) -> Any:
"SetVariable": SetVariableExecutor,
"SetTextVariable": SetTextVariableExecutor,
"SetMultipleVariables": SetMultipleVariablesExecutor,
"AppendValue": AppendValueExecutor,
"ResetVariable": ResetVariableExecutor,
"ClearAllVariables": ClearAllVariablesExecutor,
"SendActivity": SendActivityExecutor,
"EmitEvent": EmitEventExecutor,
"ParseValue": ParseValueExecutor,
"EditTable": EditTableExecutor,
"EditTableV2": EditTableV2Executor,
Expand Down
Loading
Loading