Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/packages/core/agent_framework/_workflows/_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def __init__(self) -> None:
"""

condition: Callable[[Any], bool]
target: Executor
target: Executor | str


@dataclass
Expand All @@ -255,7 +255,7 @@ def __init__(self) -> None:
assert fallback.target.id == "dead_letter"
"""

target: Executor
target: Executor | str


@dataclass(init=False)
Expand Down
21 changes: 9 additions & 12 deletions python/packages/core/agent_framework/_workflows/_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,44 +101,41 @@ class WorkflowGraphValidator:
def __init__(self) -> None:
self._edges: list[Edge] = []
self._executors: dict[str, Executor] = {}
self._start_executor_ref: Executor | str | None = None

# region Core Validation Methods
def validate_workflow(
self,
edge_groups: Sequence[EdgeGroup],
executors: dict[str, Executor],
start_executor: Executor | str,
start_executor: Executor,
) -> None:
"""Validate the entire workflow graph.

Args:
edge_groups: list of edge groups in the workflow
executors: Map of executor IDs to executor instances
start_executor: The starting executor (can be instance or ID)
start_executor: The starting executor

Raises:
WorkflowValidationError: If any validation fails
"""
self._executors = executors
self._edges = [edge for group in edge_groups for edge in group.edges]
self._edge_groups = edge_groups
self._start_executor_ref = start_executor

# If only the start executor exists, add it to the executor map
# Handle the special case where the workflow consists of only a single executor and no edges.
# In this scenario, the executor map will be empty because there are no edge groups to reference executors.
# Adding the start executor to the map ensures that single-executor workflows (without any edges) are supported,
# allowing validation and execution to proceed for workflows that do not require inter-executor communication.
if not self._executors and start_executor and isinstance(start_executor, Executor):
if not self._executors:
self._executors[start_executor.id] = start_executor

# Validate that start_executor exists in the graph
# It should because we check for it in the WorkflowBuilder
# but we do it here for completeness.
start_executor_id = start_executor.id if isinstance(start_executor, Executor) else start_executor
if start_executor_id not in self._executors:
raise GraphConnectivityError(f"Start executor '{start_executor_id}' is not present in the workflow graph")
if start_executor.id not in self._executors:
raise GraphConnectivityError(f"Start executor '{start_executor.id}' is not present in the workflow graph")

# Additional presence verification:
# A start executor that is only injected via the builder (present in the executors map)
Expand All @@ -152,16 +149,16 @@ def validate_workflow(
for e in self._edges:
edge_executor_ids.add(e.source_id)
edge_executor_ids.add(e.target_id)
if start_executor_id not in edge_executor_ids:
if start_executor.id not in edge_executor_ids:
raise GraphConnectivityError(
f"Start executor '{start_executor_id}' is not present in the workflow graph"
f"Start executor '{start_executor.id}' is not present in the workflow graph"
)

# Run all checks
self._validate_edge_duplication()
self._validate_handler_output_annotations()
self._validate_type_compatibility()
self._validate_graph_connectivity(start_executor_id)
self._validate_graph_connectivity(start_executor.id)
self._validate_self_loops()
self._validate_dead_ends()

Expand Down Expand Up @@ -398,7 +395,7 @@ def _validate_dead_ends(self) -> None:
def validate_workflow_graph(
edge_groups: Sequence[EdgeGroup],
executors: dict[str, Executor],
start_executor: Executor | str,
start_executor: Executor,
) -> None:
"""Convenience function to validate a workflow graph.

Expand Down
9 changes: 3 additions & 6 deletions python/packages/core/agent_framework/_workflows/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def __init__(
self,
edge_groups: list[EdgeGroup],
executors: dict[str, Executor],
start_executor: Executor | str,
start_executor: Executor,
runner_context: RunnerContext,
max_iterations: int = DEFAULT_MAX_ITERATIONS,
name: str | None = None,
Expand All @@ -192,19 +192,16 @@ def __init__(
Args:
edge_groups: A list of EdgeGroup instances that define the workflow edges.
executors: A dictionary mapping executor IDs to Executor instances.
start_executor: The starting executor for the workflow, which can be an Executor instance or its ID.
start_executor: The starting executor for the workflow.
runner_context: The RunnerContext instance to be used during workflow execution.
max_iterations: The maximum number of iterations the workflow will run for convergence.
name: Optional human-readable name for the workflow.
description: Optional description of what the workflow does.
kwargs: Additional keyword arguments. Unused in this implementation.
"""
# Convert start_executor to string ID if it's an Executor instance
start_executor_id = start_executor.id if isinstance(start_executor, Executor) else start_executor

self.edge_groups = list(edge_groups)
self.executors = dict(executors)
self.start_executor_id = start_executor_id
self.start_executor_id = start_executor.id
self.max_iterations = max_iterations
self.id = str(uuid.uuid4())
self.name = name
Expand Down
Loading
Loading