diff --git a/python/packages/core/agent_framework/_workflows/_viz.py b/python/packages/core/agent_framework/_workflows/_viz.py index 14011cb5a5..0fcf8af32d 100644 --- a/python/packages/core/agent_framework/_workflows/_viz.py +++ b/python/packages/core/agent_framework/_workflows/_viz.py @@ -7,16 +7,16 @@ from pathlib import Path from typing import Literal -from ._edge import FanInEdgeGroup +from ._edge import FanInEdgeGroup, InternalEdgeGroup from ._workflow import Workflow # Import of WorkflowExecutor is performed lazily inside methods to avoid cycles -"""Workflow visualization module using graphviz.""" +"""Workflow visualization module using graphviz and Mermaid.""" class WorkflowViz: - """A class for visualizing workflows using graphviz.""" + """A class for visualizing workflows using graphviz and Mermaid.""" def __init__(self, workflow: Workflow): """Initialize the WorkflowViz with a workflow. @@ -26,9 +26,13 @@ def __init__(self, workflow: Workflow): """ self._workflow = workflow - def to_digraph(self) -> str: + def to_digraph(self, include_internal_executors: bool = False) -> str: """Export the workflow as a DOT format digraph string. + Args: + include_internal_executors (bool): Whether to include internal executors in the visualization. + Default is False. + Returns: A string representation of the workflow in DOT format. """ @@ -39,20 +43,37 @@ def to_digraph(self) -> str: lines.append("") # Emit the top-level workflow nodes/edges - self._emit_workflow_digraph(self._workflow, lines, indent=" ") + self._emit_workflow_digraph( + self._workflow, + lines, + indent=" ", + include_internal_executors=include_internal_executors, + ) # Emit sub-workflows hosted by WorkflowExecutor as nested clusters - self._emit_sub_workflows_digraph(self._workflow, lines, indent=" ") + self._emit_sub_workflows_digraph( + self._workflow, + lines, + indent=" ", + include_internal_executors=include_internal_executors, + ) lines.append("}") return "\n".join(lines) - def export(self, format: Literal["svg", "png", "pdf", "dot"] = "svg", filename: str | None = None) -> str: + def export( + self, + format: Literal["svg", "png", "pdf", "dot"] = "svg", + filename: str | None = None, + include_internal_executors: bool = False, + ) -> str: """Export the workflow visualization to a file or return the file path. Args: format: The output format. Supported formats: 'svg', 'png', 'pdf', 'dot'. filename: Optional filename to save the output. If None, creates a temporary file. + include_internal_executors (bool): Whether to include internal executors in the visualization. + Default is False. Returns: The path to the saved file. @@ -66,7 +87,7 @@ def export(self, format: Literal["svg", "png", "pdf", "dot"] = "svg", filename: raise ValueError(f"Unsupported format: {format}. Supported formats: svg, png, pdf, dot") if format == "dot": - content = self.to_digraph() + content = self.to_digraph(include_internal_executors=include_internal_executors) if filename: with open(filename, "w", encoding="utf-8") as f: f.write(content) @@ -87,7 +108,7 @@ def export(self, format: Literal["svg", "png", "pdf", "dot"] = "svg", filename: ) from e # Create a temporary graphviz Source object - dot_content = self.to_digraph() + dot_content = self.to_digraph(include_internal_executors=include_internal_executors) source = graphviz.Source(dot_content) try: @@ -99,7 +120,7 @@ def export(self, format: Literal["svg", "png", "pdf", "dot"] = "svg", filename: # Remove extension if present since graphviz.render() adds it base_name = str(output_path.with_suffix("")) - source.render(base_name, format=format, cleanup=True) + source.render(base_name, format=format, cleanup=True) # type: ignore # Return the actual filename with extension return f"{base_name}.{format}" @@ -108,7 +129,7 @@ def export(self, format: Literal["svg", "png", "pdf", "dot"] = "svg", filename: temp_path = Path(temp_file.name) base_name = str(temp_path.with_suffix("")) - source.render(base_name, format=format, cleanup=True) + source.render(base_name, format=format, cleanup=True) # type: ignore return f"{base_name}.{format}" except graphviz.backend.execute.ExecutableNotFound as e: raise ImportError( @@ -118,60 +139,72 @@ def export(self, format: Literal["svg", "png", "pdf", "dot"] = "svg", filename: "brew install graphviz on macOS, or download from https://graphviz.org/download/ for other platforms." ) from e - def save_svg(self, filename: str) -> str: + def save_svg(self, filename: str, include_internal_executors: bool = False) -> str: """Convenience method to save as SVG. Args: filename: The filename to save the SVG file. + include_internal_executors (bool): Whether to include internal executors in the visualization. + Default is False. Returns: The path to the saved SVG file. """ - return self.export(format="svg", filename=filename) + return self.export(format="svg", filename=filename, include_internal_executors=include_internal_executors) - def save_png(self, filename: str) -> str: + def save_png(self, filename: str, include_internal_executors: bool = False) -> str: """Convenience method to save as PNG. Args: filename: The filename to save the PNG file. + include_internal_executors (bool): Whether to include internal executors in the visualization. + Default is False. Returns: The path to the saved PNG file. """ - return self.export(format="png", filename=filename) + return self.export(format="png", filename=filename, include_internal_executors=include_internal_executors) - def save_pdf(self, filename: str) -> str: + def save_pdf(self, filename: str, include_internal_executors: bool = False) -> str: """Convenience method to save as PDF. Args: filename: The filename to save the PDF file. + include_internal_executors (bool): Whether to include internal executors in the visualization. + Default is False. Returns: The path to the saved PDF file. """ - return self.export(format="pdf", filename=filename) + return self.export(format="pdf", filename=filename, include_internal_executors=include_internal_executors) - def to_mermaid(self) -> str: + def to_mermaid(self, include_internal_executors: bool = False) -> str: """Export the workflow as a Mermaid flowchart string. + Args: + include_internal_executors (bool): Whether to include internal executors in the visualization. + Default is False. + Returns: A string representation of the workflow in Mermaid flowchart syntax. """ - - def _san(s: str) -> str: - """Sanitize an ID for Mermaid (alphanumeric and underscore, start with letter).""" - s2 = re.sub(r"[^0-9A-Za-z_]", "_", s) - if not s2 or not s2[0].isalpha(): - s2 = f"n_{s2}" - return s2 - lines: list[str] = ["flowchart TD"] # Emit top-level workflow - self._emit_workflow_mermaid(self._workflow, lines, indent=" ") + self._emit_workflow_mermaid( + self._workflow, + lines, + indent=" ", + include_internal_executors=include_internal_executors, + ) # Emit sub-workflows as Mermaid subgraphs - self._emit_sub_workflows_mermaid(self._workflow, lines, indent=" ") + self._emit_sub_workflows_mermaid( + self._workflow, + lines, + indent=" ", + include_internal_executors=include_internal_executors, + ) return "\n".join(lines) @@ -181,13 +214,13 @@ def _fan_in_digest(self, target: str, sources: list[str]) -> str: sources_sorted = sorted(sources) return hashlib.sha256((target + "|" + "|".join(sources_sorted)).encode("utf-8")).hexdigest()[:8] - def _compute_fan_in_descriptors(self, wf: Workflow | None = None) -> list[tuple[str, list[str], str]]: + def _compute_fan_in_descriptors(self, workflow: Workflow | None = None) -> list[tuple[str, list[str], str]]: """Return list of (node_id, sources, target) for fan-in groups. node_id is DOT-oriented: fan_in::target::digest """ result: list[tuple[str, list[str], str]] = [] - workflow = wf or self._workflow + workflow = workflow or self._workflow for group in workflow.edge_groups: if isinstance(group, FanInEdgeGroup): target = group.target_executor_ids[0] @@ -197,13 +230,19 @@ def _compute_fan_in_descriptors(self, wf: Workflow | None = None) -> list[tuple[ result.append((node_id, sorted(sources), target)) return result - def _compute_normal_edges(self, wf: Workflow | None = None) -> list[tuple[str, str, bool]]: + def _compute_normal_edges( + self, + workflow: Workflow | None = None, + include_internal_executors: bool = False, + ) -> list[tuple[str, str, bool]]: """Return list of (source_id, target_id, is_conditional) for non-fan-in groups.""" edges: list[tuple[str, str, bool]] = [] - workflow = wf or self._workflow + workflow = workflow or self._workflow for group in workflow.edge_groups: if isinstance(group, FanInEdgeGroup): continue + if isinstance(group, InternalEdgeGroup) and not include_internal_executors: + continue for edge in group.edges: is_cond = getattr(edge, "_condition", None) is not None edges.append((edge.source_id, edge.target_id, is_cond)) @@ -213,7 +252,14 @@ def _compute_normal_edges(self, wf: Workflow | None = None) -> list[tuple[str, s # region Internal emitters (DOT) - def _emit_workflow_digraph(self, wf: Workflow, lines: list[str], indent: str, ns: str | None = None) -> None: + def _emit_workflow_digraph( + self, + workflow: Workflow, + lines: list[str], + indent: str, + ns: str | None = None, + include_internal_executors: bool = False, + ) -> None: """Emit DOT nodes/edges for the given workflow. If ns (namespace) is provided, node ids are prefixed with f"{ns}/" for uniqueness, @@ -224,16 +270,16 @@ def map_id(x: str) -> str: return f"{ns}/{x}" if ns else x # Nodes - start_executor_id = wf.start_executor_id + start_executor_id = workflow.start_executor_id lines.append( f'{indent}"{map_id(start_executor_id)}" [fillcolor=lightgreen, label="{start_executor_id}\\n(Start)"];' ) - for executor_id in wf.executors: + for executor_id in workflow.executors: if executor_id != start_executor_id: lines.append(f'{indent}"{map_id(executor_id)}" [label="{executor_id}"];') # Fan-in nodes - fan_in_nodes = self._compute_fan_in_descriptors(wf) + fan_in_nodes = self._compute_fan_in_descriptors(workflow) if fan_in_nodes: lines.append("") for node_id, _, _ in fan_in_nodes: @@ -246,11 +292,19 @@ def map_id(x: str) -> str: lines.append(f'{indent}"{map_id(node_id)}" -> "{map_id(target)}";') # Normal edges - for src, tgt, is_cond in self._compute_normal_edges(wf): + for src, tgt, is_cond in self._compute_normal_edges( + workflow, include_internal_executors=include_internal_executors + ): edge_attr = ' [style=dashed, label="conditional"]' if is_cond else "" lines.append(f'{indent}"{map_id(src)}" -> "{map_id(tgt)}"{edge_attr};') - def _emit_sub_workflows_digraph(self, wf: Workflow, lines: list[str], indent: str) -> None: + def _emit_sub_workflows_digraph( + self, + workflow: Workflow, + lines: list[str], + indent: str, + include_internal_executors: bool = False, + ) -> None: """Emit DOT subgraphs for any WorkflowExecutor instances found in the workflow.""" # Lazy import to avoid any potential import cycles try: @@ -258,7 +312,7 @@ def _emit_sub_workflows_digraph(self, wf: Workflow, lines: list[str], indent: st except ImportError: # pragma: no cover - best-effort; if unavailable, skip subgraphs return - for exec_id, exec_obj in wf.executors.items(): + for exec_id, exec_obj in workflow.executors.items(): if isinstance(exec_obj, WorkflowExecutor) and hasattr(exec_obj, "workflow") and exec_obj.workflow: subgraph_id = f"cluster_{uuid.uuid5(uuid.NAMESPACE_OID, exec_id).hex[:8]}" lines.append(f"{indent}subgraph {subgraph_id} {{") @@ -267,10 +321,21 @@ def _emit_sub_workflows_digraph(self, wf: Workflow, lines: list[str], indent: st # Emit the nested workflow inside this cluster using a namespace ns = exec_id - self._emit_workflow_digraph(exec_obj.workflow, lines, indent=f"{indent} ", ns=ns) + self._emit_workflow_digraph( + exec_obj.workflow, + lines, + indent=f"{indent} ", + ns=ns, + include_internal_executors=include_internal_executors, + ) # Recurse into deeper nested sub-workflows - self._emit_sub_workflows_digraph(exec_obj.workflow, lines, indent=f"{indent} ") + self._emit_sub_workflows_digraph( + exec_obj.workflow, + lines, + indent=f"{indent} ", + include_internal_executors=include_internal_executors, + ) lines.append(f"{indent}}}") @@ -278,7 +343,14 @@ def _emit_sub_workflows_digraph(self, wf: Workflow, lines: list[str], indent: st # region Internal emitters (Mermaid) - def _emit_workflow_mermaid(self, wf: Workflow, lines: list[str], indent: str, ns: str | None = None) -> None: + def _emit_workflow_mermaid( + self, + workflow: Workflow, + lines: list[str], + indent: str, + ns: str | None = None, + include_internal_executors: bool = False, + ) -> None: def _san(s: str) -> str: s2 = re.sub(r"[^0-9A-Za-z_]", "_", s) if not s2 or not s2[0].isalpha(): @@ -291,15 +363,15 @@ def map_id(x: str) -> str: return _san(x) # Nodes - start_executor_id = wf.start_executor_id + start_executor_id = workflow.start_executor_id lines.append(f'{indent}{map_id(start_executor_id)}["{start_executor_id} (Start)"];') - for executor_id in wf.executors: + for executor_id in workflow.executors: if executor_id == start_executor_id: continue lines.append(f'{indent}{map_id(executor_id)}["{executor_id}"];') # Fan-in nodes - fan_in_nodes_dot = self._compute_fan_in_descriptors(wf) + fan_in_nodes_dot = self._compute_fan_in_descriptors(workflow) fan_in_nodes: list[tuple[str, list[str], str]] = [] for dot_node_id, sources, target in fan_in_nodes_dot: digest = dot_node_id.split("::")[-1] @@ -318,7 +390,9 @@ def map_id(x: str) -> str: lines.append(f"{indent}{fan_node_id} --> {map_id(target)};") # Normal edges - for src, tgt, is_cond in self._compute_normal_edges(wf): + for src, tgt, is_cond in self._compute_normal_edges( + workflow, include_internal_executors=include_internal_executors + ): s = map_id(src) t = map_id(tgt) if is_cond: @@ -326,7 +400,13 @@ def map_id(x: str) -> str: else: lines.append(f"{indent}{s} --> {t};") - def _emit_sub_workflows_mermaid(self, wf: Workflow, lines: list[str], indent: str) -> None: + def _emit_sub_workflows_mermaid( + self, + workflow: Workflow, + lines: list[str], + indent: str, + include_internal_executors: bool = False, + ) -> None: try: from ._workflow_executor import WorkflowExecutor # type: ignore except ImportError: # pragma: no cover @@ -338,14 +418,25 @@ def _san(s: str) -> str: s2 = f"n_{s2}" return s2 - for exec_id, exec_obj in wf.executors.items(): + for exec_id, exec_obj in workflow.executors.items(): if isinstance(exec_obj, WorkflowExecutor) and hasattr(exec_obj, "workflow") and exec_obj.workflow: sg_id = _san(exec_id) lines.append(f"{indent}subgraph {sg_id}") # Render nested workflow within this subgraph using namespacing - self._emit_workflow_mermaid(exec_obj.workflow, lines, indent=f"{indent} ", ns=exec_id) + self._emit_workflow_mermaid( + exec_obj.workflow, + lines, + indent=f"{indent} ", + ns=exec_id, + include_internal_executors=include_internal_executors, + ) # Recurse into deeper sub-workflows - self._emit_sub_workflows_mermaid(exec_obj.workflow, lines, indent=f"{indent} ") + self._emit_sub_workflows_mermaid( + exec_obj.workflow, + lines, + indent=f"{indent} ", + include_internal_executors=include_internal_executors, + ) lines.append(f"{indent}end") # endregion diff --git a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py index 21a9ff4b08..81545b75c7 100644 --- a/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py +++ b/python/samples/getting_started/workflows/visualization/concurrent_with_visualization.py @@ -6,14 +6,12 @@ from agent_framework import ( AgentExecutorRequest, AgentExecutorResponse, - AgentRunEvent, ChatAgent, ChatMessage, Executor, Role, WorkflowBuilder, WorkflowContext, - WorkflowOutputEvent, WorkflowViz, handler, ) @@ -124,7 +122,7 @@ def create_legal_agent() -> ChatAgent: async def main() -> None: """Build and run the concurrent workflow with visualization.""" - # 1) Build a simple fan-out/fan-in workflow + # Build a simple fan-out/fan-in workflow workflow = ( WorkflowBuilder() .register_agent(create_researcher_agent, name="researcher") @@ -138,31 +136,22 @@ async def main() -> None: .build() ) - # 1.5) Generate workflow visualization + # Generate workflow visualization print("Generating workflow visualization...") viz = WorkflowViz(workflow) # Print out the mermaid string. print("Mermaid string: \n=======") print(viz.to_mermaid()) print("=======") - # Print out the DiGraph string. + # Print out the DiGraph string with internal executors. print("DiGraph string: \n=======") - print(viz.to_digraph()) + print(viz.to_digraph(include_internal_executors=True)) print("=======") # Export the DiGraph visualization as SVG. svg_file = viz.export(format="svg") print(f"SVG file saved to: {svg_file}") - # 2) Run with a single prompt - async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."): - if isinstance(event, AgentRunEvent): - # Show which agent ran and what step completed. - print(event) - elif isinstance(event, WorkflowOutputEvent): - print("===== Final Aggregated Output =====") - print(event.data) - if __name__ == "__main__": asyncio.run(main())