diff --git a/dapr_agents/__init__.py b/dapr_agents/__init__.py index b6bc9a542..2dfdd2b3b 100644 --- a/dapr_agents/__init__.py +++ b/dapr_agents/__init__.py @@ -32,7 +32,6 @@ "OpenAIEmbeddingClient", "AgentTool", "tool", - "AgenticWorkflow", "LLMOrchestrator", "RandomOrchestrator", "RoundRobinOrchestrator", diff --git a/dapr_agents/agents/orchestrators/llm/orchestrator.py b/dapr_agents/agents/orchestrators/llm/orchestrator.py index 61dfad227..1b13224e9 100644 --- a/dapr_agents/agents/orchestrators/llm/orchestrator.py +++ b/dapr_agents/agents/orchestrators/llm/orchestrator.py @@ -14,7 +14,7 @@ BroadcastMessage, TriggerAction, ) -from dapr_agents.workflow.decorators.routers import message_router +from dapr_agents.workflow.decorators.decorators import message_router from dapr_agents.workflow.decorators import workflow_entry from dapr_agents.agents.orchestrators.llm.prompts import ( NEXT_STEP_PROMPT, diff --git a/dapr_agents/agents/orchestrators/random.py b/dapr_agents/agents/orchestrators/random.py index 89ee580af..fff9032ce 100644 --- a/dapr_agents/agents/orchestrators/random.py +++ b/dapr_agents/agents/orchestrators/random.py @@ -20,7 +20,7 @@ BroadcastMessage, TriggerAction, ) -from dapr_agents.workflow.decorators.routers import message_router +from dapr_agents.workflow.decorators.decorators import message_router from dapr_agents.workflow.decorators import workflow_entry from dapr_agents.workflow.utils.pubsub import broadcast_message, send_message_to_agent diff --git a/dapr_agents/agents/orchestrators/roundrobin.py b/dapr_agents/agents/orchestrators/roundrobin.py index bd300d0b3..2b0ab4dc0 100644 --- a/dapr_agents/agents/orchestrators/roundrobin.py +++ b/dapr_agents/agents/orchestrators/roundrobin.py @@ -19,7 +19,7 @@ BroadcastMessage, TriggerAction, ) -from dapr_agents.workflow.decorators.routers import message_router +from dapr_agents.workflow.decorators.decorators import message_router from dapr_agents.workflow.decorators import workflow_entry from dapr_agents.workflow.utils.pubsub import broadcast_message, send_message_to_agent diff --git a/dapr_agents/workflow/__init__.py b/dapr_agents/workflow/__init__.py index 6362a54ba..689334e02 100644 --- a/dapr_agents/workflow/__init__.py +++ b/dapr_agents/workflow/__init__.py @@ -1,15 +1,11 @@ from .decorators import ( message_router, http_router, - llm_activity, - agent_activity, workflow_entry, ) __all__ = [ "message_router", "http_router", - "llm_activity", - "agent_activity", "workflow_entry", ] diff --git a/dapr_agents/workflow/decorators/__init__.py b/dapr_agents/workflow/decorators/__init__.py index 8d23a1d85..716324395 100644 --- a/dapr_agents/workflow/decorators/__init__.py +++ b/dapr_agents/workflow/decorators/__init__.py @@ -1,10 +1,7 @@ -from .routers import message_router, http_router -from .activities import llm_activity, agent_activity, workflow_entry +from .decorators import message_router, http_router, workflow_entry __all__ = [ "message_router", "http_router", - "llm_activity", - "agent_activity", "workflow_entry", ] diff --git a/dapr_agents/workflow/decorators/activities.py b/dapr_agents/workflow/decorators/activities.py deleted file mode 100644 index b3a696d12..000000000 --- a/dapr_agents/workflow/decorators/activities.py +++ /dev/null @@ -1,216 +0,0 @@ -from __future__ import annotations - -import asyncio -import functools -import inspect -import logging -from typing import Any, Callable, Literal, Optional, TypeVar - -from dapr.ext.workflow import WorkflowActivityContext # type: ignore - -from dapr_agents.agents.base import AgentBase -from dapr_agents.llm.chat import ChatClientBase -from dapr_agents.workflow.utils.activities import ( - build_llm_params, - convert_result, - extract_ctx_and_payload, - format_agent_input, - format_prompt, - normalize_input, - strip_context_parameter, - validate_result, -) - -logger = logging.getLogger(__name__) - -R = TypeVar("R") - - -def workflow_entry(func: Callable[..., R]) -> Callable[..., R]: - """ - Mark a method/function as the workflow entrypoint for an Agent. - - This decorator does not wrap the function; it simply annotates the callable - with `_is_workflow_entry = True` so AgentRunner can discover it on the agent - instance via reflection. - - Usage: - class MyAgent: - @workflow_entry - def my_workflow(self, ctx: DaprWorkflowContext, wf_input: dict) -> str: - ... - - Returns: - The same callable (unmodified), with an identifying attribute. - """ - setattr(func, "_is_workflow_entry", True) # type: ignore[attr-defined] - return func - - -def llm_activity( - *, - prompt: str, - llm: ChatClientBase, - structured_mode: Literal["json", "function_call"] = "json", - **task_kwargs: Any, -) -> Callable[[Callable[..., Any]], Callable[..., Any]]: - """Delegate an activity's implementation to an LLM. - - The decorated function's body is not executed directly. Instead: - 1) Build a prompt from the activity's signature + `prompt` - 2) Call the provided LLM client - 3) Validate the result against the activity's return annotation - - Args: - prompt: Prompt template (e.g., "Summarize {text} in 3 bullets.") - llm: Chat client capable of `generate(**params)`. - structured_mode: Provider structured output mode ("json" or "function_call"). - **task_kwargs: Reserved for future routing/provider knobs. - - Returns: - A wrapper suitable to register as a Dapr activity. - - Raises: - ValueError: If `prompt` is empty or `llm` is missing. - """ - if not prompt: - raise ValueError("@llm_activity requires a prompt template.") - if llm is None: - raise ValueError("@llm_activity requires an explicit `llm` client instance.") - - def decorator(func: Callable[..., Any]) -> Callable[..., Any]: - if not callable(func): - raise ValueError("@llm_activity must decorate a callable activity.") - - original_sig = inspect.signature(func) - activity_sig = strip_context_parameter(original_sig) - effective_structured_mode = task_kwargs.get("structured_mode", structured_mode) - - async def _execute(ctx: WorkflowActivityContext, payload: Any = None) -> Any: - """Run the LLM pipeline inside the worker.""" - normalized = ( - normalize_input(activity_sig, payload) if payload is not None else {} - ) - - formatted_prompt = format_prompt(activity_sig, prompt, normalized) - params = build_llm_params( - activity_sig, formatted_prompt, effective_structured_mode - ) - - raw = llm.generate(**params) - if inspect.isawaitable(raw): - raw = await raw - - converted = convert_result(raw) - validated = await validate_result(converted, activity_sig) - return validated - - @functools.wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - """Sync activity wrapper: execute async pipeline to completion.""" - ctx, payload = extract_ctx_and_payload(args, dict(kwargs)) - result = _execute(ctx, payload) # coroutine - - # If we're in a thread with an active loop, run thread-safely - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - - if loop and loop.is_running(): - fut = asyncio.run_coroutine_threadsafe(result, loop) - return fut.result() - - # Otherwise create and run a fresh loop - return asyncio.run(result) - - # Useful metadata for debugging/inspection - wrapper._is_llm_activity = True # noqa: SLF001 - wrapper._llm_activity_config = { # noqa: SLF001 - "prompt": prompt, - "structured_mode": effective_structured_mode, - "task_kwargs": task_kwargs, - } - wrapper._original_activity = func # noqa: SLF001 - return wrapper - - return decorator - - -def agent_activity( - *, - agent: AgentBase, - prompt: Optional[str] = None, - **task_kwargs: Any, -) -> Callable[[Callable[..., Any]], Callable[..., Any]]: - """Route an activity through an `AgentBase`. - - The agent receives either a formatted `prompt` or a natural-language - rendering of the payload. The result is validated against the activity's return - annotation. - - Args: - agent: Agent to run the activity through. - prompt: Optional prompt template for the agent. - **task_kwargs: Reserved for future routing/provider knobs. - - Returns: - A wrapper suitable to register as a Dapr activity. - - Raises: - ValueError: If `agent` is missing. - """ - if agent is None: - raise ValueError("@agent_activity requires an AgentBase instance.") - - def decorator(func: Callable[..., Any]) -> Callable[..., Any]: - if not callable(func): - raise ValueError("@agent_activity must decorate a callable activity.") - - original_sig = inspect.signature(func) - activity_sig = strip_context_parameter(original_sig) - prompt_template = prompt or "" - - async def _execute(ctx: WorkflowActivityContext, payload: Any = None) -> Any: - normalized = ( - normalize_input(activity_sig, payload) if payload is not None else {} - ) - - if prompt_template: - formatted_prompt = format_prompt( - activity_sig, prompt_template, normalized - ) - else: - formatted_prompt = format_agent_input(payload, normalized) - - raw = await agent.run(formatted_prompt) - converted = convert_result(raw) - validated = await validate_result(converted, activity_sig) - return validated - - @functools.wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - """Sync activity wrapper: execute async pipeline to completion.""" - ctx, payload = extract_ctx_and_payload(args, dict(kwargs)) - result = _execute(ctx, payload) # coroutine - - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - - if loop and loop.is_running(): - fut = asyncio.run_coroutine_threadsafe(result, loop) - return fut.result() - - return asyncio.run(result) - - wrapper._is_agent_activity = True # noqa: SLF001 - wrapper._agent_activity_config = { # noqa: SLF001 - "prompt": prompt, - "task_kwargs": task_kwargs, - } - wrapper._original_activity = func # noqa: SLF001 - return wrapper - - return decorator diff --git a/dapr_agents/workflow/decorators/routers.py b/dapr_agents/workflow/decorators/decorators.py similarity index 89% rename from dapr_agents/workflow/decorators/routers.py rename to dapr_agents/workflow/decorators/decorators.py index 1f46d88a6..70fdbfdf8 100644 --- a/dapr_agents/workflow/decorators/routers.py +++ b/dapr_agents/workflow/decorators/decorators.py @@ -2,7 +2,7 @@ import logging from copy import deepcopy -from typing import Any, Callable, List, Literal, Optional, Type, get_type_hints +from typing import Any, Callable, List, Literal, Optional, Type, TypeVar, get_type_hints from dapr_agents.workflow.utils.core import is_supported_model from dapr_agents.workflow.utils.routers import extract_message_models @@ -11,6 +11,29 @@ HttpMethod = Literal["GET", "POST", "PUT", "PATCH", "DELETE"] +R = TypeVar("R") + + +def workflow_entry(func: Callable[..., R]) -> Callable[..., R]: + """ + Mark a method/function as the workflow entrypoint for an Agent. + + This decorator does not wrap the function; it simply annotates the callable + with `_is_workflow_entry = True` so AgentRunner can discover it on the agent + instance via reflection. + + Usage: + class MyAgent: + @workflow_entry + def my_workflow(self, ctx: DaprWorkflowContext, wf_input: dict) -> str: + ... + + Returns: + The same callable (unmodified), with an identifying attribute. + """ + setattr(func, "_is_workflow_entry", True) # type: ignore[attr-defined] + return func + def message_router( func: Optional[Callable[..., Any]] = None, diff --git a/dapr_agents/workflow/runners/agent.py b/dapr_agents/workflow/runners/agent.py index e8ce3592b..a4c4db39a 100644 --- a/dapr_agents/workflow/runners/agent.py +++ b/dapr_agents/workflow/runners/agent.py @@ -608,7 +608,11 @@ def shutdown( # First verify we're managing it with self._lock: if agent in self._managed_agents: - agent.instrumentor.uninstrument() + try: + agent.instrumentor.uninstrument() + except AttributeError: + # this happens if the agent has no instrumentor + pass agent.stop() # This is safe as they'll return None if not started self._managed_agents.remove(agent) if len(self._managed_agents) == 0: @@ -624,7 +628,11 @@ def shutdown( with self._lock: agents = list(self._managed_agents) for ag in agents: - agent.instrumentor.uninstrument() + try: + agent.instrumentor.uninstrument() + except AttributeError: + # this happens if the agent has no instrumentor + pass ag.stop() self._close_wf_client() self._close_dapr_client() diff --git a/quickstarts/01-dapr-agents-fundamentals/05_agent_memory.py b/quickstarts/01-dapr-agents-fundamentals/05_agent_memory.py index 75d344cf3..ac99e81d3 100644 --- a/quickstarts/01-dapr-agents-fundamentals/05_agent_memory.py +++ b/quickstarts/01-dapr-agents-fundamentals/05_agent_memory.py @@ -21,7 +21,7 @@ async def main() -> None: # Configure the agent to use Dapr State Store for conversation history. memory=AgentMemoryConfig( store=ConversationDaprStateMemory( - store_name="conversation-statestore", + store_name="agent-memory", session_id=Path(__file__).stem, ) ), diff --git a/quickstarts/01-dapr-agents-fundamentals/06_durable_agent_http.py b/quickstarts/01-dapr-agents-fundamentals/06_durable_agent_http.py index 9e6a15b10..23af38aee 100644 --- a/quickstarts/01-dapr-agents-fundamentals/06_durable_agent_http.py +++ b/quickstarts/01-dapr-agents-fundamentals/06_durable_agent_http.py @@ -22,13 +22,13 @@ def main() -> None: # Configure the agent to use Dapr State Store for conversation history. memory=AgentMemoryConfig( store=ConversationDaprStateMemory( - store_name="conversation-statestore", + store_name="agent-memory", session_id=Path(__file__).stem, ) ), # This is where the execution state is stored state=AgentStateConfig( - store=StateStoreService(store_name="workflow-statestore"), + store=StateStoreService(store_name="agent-workflow"), ), ) diff --git a/quickstarts/01-dapr-agents-fundamentals/07_durable_agent_pubsub.py b/quickstarts/01-dapr-agents-fundamentals/07_durable_agent_pubsub.py index 82fd3e3db..cf21d0a46 100644 --- a/quickstarts/01-dapr-agents-fundamentals/07_durable_agent_pubsub.py +++ b/quickstarts/01-dapr-agents-fundamentals/07_durable_agent_pubsub.py @@ -28,17 +28,17 @@ async def main() -> None: # Configure the agent to use Dapr State Store for conversation history. memory=AgentMemoryConfig( store=ConversationDaprStateMemory( - store_name="conversation-statestore", + store_name="agent-memory", session_id=Path(__file__).stem, ) ), # This is where the execution state is stored state=AgentStateConfig( - store=StateStoreService(store_name="workflow-statestore"), + store=StateStoreService(store_name="agent-workflow"), ), # This is where the agent listens for incoming tasks. pubsub=AgentPubSubConfig( - pubsub_name="message-pubsub", + pubsub_name="agent-pubsub", agent_topic="weather.requests", broadcast_topic="agents.broadcast", ), diff --git a/quickstarts/01-dapr-agents-fundamentals/08_workflow_llm.py b/quickstarts/01-dapr-agents-fundamentals/08_workflow_llm.py index 95f8bd807..f8e53ce29 100644 --- a/quickstarts/01-dapr-agents-fundamentals/08_workflow_llm.py +++ b/quickstarts/01-dapr-agents-fundamentals/08_workflow_llm.py @@ -5,7 +5,6 @@ from dotenv import load_dotenv from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import llm_activity load_dotenv() @@ -27,22 +26,21 @@ def analyze_topic(ctx: DaprWorkflowContext, topic: str): @wfr.activity(name="create_outline") -@llm_activity( - prompt="Create a very short outline about the topic '{topic}'. Provide 5 bullet points only.", - llm=llm, -) def create_outline(ctx, topic: str) -> str: - # The llm_activity decorator handles the actual LLM invocation. - pass + return str( + llm.generate( + prompt=f"Create a very short outline about the topic '{topic}'. Provide 5 bullet points only." + ) + ) @wfr.activity(name="write_blog") -@llm_activity( - prompt="Write a short (2 paragraphs) friendly blog post following this outline:\n{outline}", - llm=llm, -) def write_blog(ctx, outline: str) -> str: - pass + return str( + llm.generate( + prompt=f"Write a short (2 paragraphs) friendly blog post following this outline:\n{outline}" + ) + ) if __name__ == "__main__": diff --git a/quickstarts/01-dapr-agents-fundamentals/09_expert_agent.py b/quickstarts/01-dapr-agents-fundamentals/09_expert_agent.py new file mode 100644 index 000000000..b259516e4 --- /dev/null +++ b/quickstarts/01-dapr-agents-fundamentals/09_expert_agent.py @@ -0,0 +1,40 @@ +from dapr_agents import DurableAgent +from dapr_agents.agents.configs import AgentMemoryConfig +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.memory import ConversationDaprStateMemory +from dapr_agents.workflow.runners.agent import AgentRunner +from dotenv import load_dotenv + +load_dotenv() +llm = DaprChatClient(component_name="llm-provider") + + +def main(): + expert_agent = DurableAgent( + name="expert_agent", + role="Technical Support Specialist", + goal="Provide recommendations based on customer context and issue.", + instructions=[ + "Provide a clear, actionable recommendation to resolve the issue.", + ], + llm=llm, + memory=AgentMemoryConfig( + store=ConversationDaprStateMemory( + store_name="agent-memory", + session_id=f"expert-agent-session", + ) + ), + ) + runner = AgentRunner() + try: + runner.serve(expert_agent, port=8002) + finally: + runner.shutdown(expert_agent) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down expert agent...") + exit(0) diff --git a/quickstarts/01-dapr-agents-fundamentals/09_triage_agent.py b/quickstarts/01-dapr-agents-fundamentals/09_triage_agent.py new file mode 100644 index 000000000..211b60fb1 --- /dev/null +++ b/quickstarts/01-dapr-agents-fundamentals/09_triage_agent.py @@ -0,0 +1,55 @@ +from dapr_agents import DurableAgent, tool +from dapr_agents.agents.configs import AgentMemoryConfig +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.memory import ConversationDaprStateMemory +from dapr_agents.workflow.runners.agent import AgentRunner +from dotenv import load_dotenv + +load_dotenv() +llm = DaprChatClient(component_name="llm-provider") + + +@tool +def get_customer_info(customer_name: str) -> str: + """Get customer information by name. Returns a simple text description.""" + customers = { + "alice": "Customer: Alice, Premium Plan, 5 active services", + "bob": "Customer: Bob, Standard Plan, 2 active services", + "charlie": "Customer: Charlie, Basic Plan, 1 active service", + } + return customers.get( + customer_name.lower(), + f"Customer: {customer_name}, Standard Plan, 1 active service", + ) + + +def main(): + triage_agent = DurableAgent( + name="triage_agent", + role="Customer Support Triage Assistant", + goal="Gather customer information and prepare a triage summary.", + instructions=[ + "Use the tool to get customer information, then combine it with the issue description.", + ], + llm=llm, + tools=[get_customer_info], + memory=AgentMemoryConfig( + store=ConversationDaprStateMemory( + store_name="agent-memory", + session_id=f"triage-agent-session", + ) + ), + ) + runner = AgentRunner() + try: + runner.serve(triage_agent, port=8001) + finally: + runner.shutdown(triage_agent) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down triage agent...") + exit(0) diff --git a/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.py b/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.py index c38e6c2b0..6dd33488c 100644 --- a/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.py +++ b/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.py @@ -1,85 +1,26 @@ -from pathlib import Path import time import dapr.ext.workflow as wf -from dapr.ext.workflow import DaprWorkflowContext, WorkflowRuntime -from dotenv import load_dotenv -from dapr_agents import Agent, tool -from dapr_agents.agents.configs import AgentMemoryConfig -from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.memory import ConversationDaprStateMemory -from dapr_agents.workflow.decorators import agent_activity +wfr = wf.WorkflowRuntime() -load_dotenv() -# Initialize workflow runtime + LLM client -wfr = WorkflowRuntime() -llm = DaprChatClient(component_name="llm-provider") - - -# ------------- TOOLS ------------- -@tool -def get_customer_info(customer_name: str) -> str: - """Get customer information by name. Returns a simple text description.""" - # Simple mock customer data - customers = { - "alice": "Customer: Alice, Premium Plan, 5 active services", - "bob": "Customer: Bob, Standard Plan, 2 active services", - "charlie": "Customer: Charlie, Basic Plan, 1 active service", - } - return customers.get( - customer_name.lower(), - f"Customer: {customer_name}, Standard Plan, 1 active service", - ) - - -# ------------- AGENTS ------------- -triage_agent = Agent( - name="Triage Agent", - role="Customer Support Triage Assistant", - goal="Gather customer information and prepare a triage summary.", - instructions=[ - "Use the tool to get customer information, then combine it with the issue description.", - ], - llm=llm, - tools=[get_customer_info], - memory=AgentMemoryConfig( - store=ConversationDaprStateMemory( - store_name="conversation-statestore", - session_id=f"{Path(__file__).stem}-triage", - ) - ), -) - -expert_agent = Agent( - name="Expert Agent", - role="Technical Support Specialist", - goal="Provide recommendations based on customer context and issue.", - instructions=[ - "Provide a clear, actionable recommendation to resolve the issue.", - ], - llm=llm, - memory=AgentMemoryConfig( - store=ConversationDaprStateMemory( - store_name="conversation-statestore", - session_id=f"{Path(__file__).stem}-expert", - ) - ), -) - - -# ------------- WORKFLOW ------------- @wfr.workflow(name="support_workflow") -def support_workflow(ctx: DaprWorkflowContext, request: dict): +def support_workflow(ctx: wf.DaprWorkflowContext, request: dict) -> str: """Process a support request through triage and expert agents.""" # Each step is durable and can be retried - triage_result = yield ctx.call_activity(triage_request, input=request) + triage_result = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": f"Assist with the following support request:\n\n{request}"}, + app_id="triage-agent", + ) if triage_result: print("Triage result:", triage_result.get("content", ""), flush=True) - recommendation = yield ctx.call_activity( - get_recommendation, input=triage_result.get("content", "") + recommendation = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": triage_result.get("content", "")}, + app_id="expert-agent", ) if recommendation: print("Recommendation:", recommendation.get("content", ""), flush=True) @@ -87,24 +28,6 @@ def support_workflow(ctx: DaprWorkflowContext, request: dict): return recommendation.get("content", "") if recommendation else "" -# ------------- ACTIVITIES ------------- -@wfr.activity(name="triage_request") -@agent_activity(agent=triage_agent) -def triage_request(ctx, customer: str, issue: str) -> dict: - """Triage the support request by gathering customer info and summarizing. - - The workflow passes a dict with `customer` and `issue` keys, which map to these parameters. - """ - pass - - -@wfr.activity(name="get_recommendation") -@agent_activity(agent=expert_agent) -def get_recommendation(ctx) -> dict: - """Get expert recommendation based on triage summary.""" - pass - - if __name__ == "__main__": wfr.start() time.sleep(5) # give the runtime time to initialize diff --git a/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.yaml b/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.yaml new file mode 100644 index 000000000..8d3d969ab --- /dev/null +++ b/quickstarts/01-dapr-agents-fundamentals/09_workflow_agents.yaml @@ -0,0 +1,21 @@ +version: 1 +common: + resourcesPath: ./components + logLevel: info + appLogDestination: console + daprdLogDestination: console + +apps: + - appID: triage-agent + appDirPath: ./ + command: ["python3", "09_triage_agent.py"] + appPort: 8001 + + - appID: expert-agent + appDirPath: ./ + command: ["python3", "09_expert_agent.py"] + appPort: 8002 + + - appID: workflow + appDirPath: ./ + command: ["python3", "09_workflow_agents.py"] \ No newline at end of file diff --git a/quickstarts/01-dapr-agents-fundamentals/10_durable_agent_tracing.py b/quickstarts/01-dapr-agents-fundamentals/10_durable_agent_tracing.py index c50a335ee..ce6772d6f 100644 --- a/quickstarts/01-dapr-agents-fundamentals/10_durable_agent_tracing.py +++ b/quickstarts/01-dapr-agents-fundamentals/10_durable_agent_tracing.py @@ -28,12 +28,12 @@ async def main() -> None: llm=DaprChatClient(component_name="llm-provider"), memory=AgentMemoryConfig( store=ConversationDaprStateMemory( - store_name="conversation-statestore", + store_name="agent-memory", session_id=Path(__file__).stem, ) ), state=AgentStateConfig( - store=StateStoreService(store_name="workflow-statestore"), + store=StateStoreService(store_name="agent-workflow"), ), agent_observability=AgentObservabilityConfig( enabled=True, diff --git a/quickstarts/01-dapr-agents-fundamentals/README.md b/quickstarts/01-dapr-agents-fundamentals/README.md index c28bbd217..86c00508b 100644 --- a/quickstarts/01-dapr-agents-fundamentals/README.md +++ b/quickstarts/01-dapr-agents-fundamentals/README.md @@ -69,7 +69,7 @@ Replace `OPENAI_API_KEY` with your actual OpenAI API key. If you are using a dif This example shows the simplest way to call an LLM using the Dapr Chat Client, which sends prompts through the Dapr Conversation API. It’s a minimal starting point before introducing agents in later examples. ```bash -dapr run --app-id llm-client --resources-path resources -- python 01_llm_client.py +dapr run --app-id llm-client --resources-path components-- python 01_llm_client.py ``` ## Expected Behavior @@ -91,7 +91,7 @@ Dapr Agents also include native LLM clients for other modalities (e.g., audio), This example introduces the basic concept of a Dapr Agent. An agent wraps an LLM with a name, role, and instructions that define how it should behave. Unlike the previous example—where you called the LLM directly—an agent provides a reusable interface you can trigger multiple times, and it will consistently act according to its assigned role. ```bash -dapr run --app-id agent-llm --resources-path resources -- python 02_agent_llm.py +dapr run --app-id agent-llm --resources-path components-- python 02_agent_llm.py ``` ## Expected Behavior @@ -113,7 +113,7 @@ Modify the agent’s role or instructions and observe how its behavior changes w This example shows how to quickly create an agent with a custom prompt, backed by the Dapr Conversation API, and how to expose a local Python function as a tool the agent can call during reasoning. It demonstrates the simplest way to run an agent locally as a regular Python program while benefiting from Dapr’s LLM abstraction. ```bash -dapr run --app-id agent-llm --resources-path resources -- python 03_agent_llm_tools.py +dapr run --app-id agent-llm --resources-path components-- python 03_agent_llm_tools.py ``` ## Expected Behavior @@ -137,7 +137,7 @@ When you run the script, the agent receives a weather question, invokes a local This example is very similar to the previous one, except that the agent does not use hard-coded Python functions as tools. Instead, it dynamically discovers its tools from an MCP (Model Context Protocol) server running locally over STDIO, allowing tools to be added or modified without changing the agent code. ```bash -dapr run --app-id agent-mcp --resources-path resources -- python 04_agent_mcp_tools.py +dapr run --app-id agent-mcp --resources-path components-- python 04_agent_mcp_tools.py ``` ## Expected Behavior @@ -163,7 +163,7 @@ When you run the script, the agent queries the MCP server for available tools, i This example shows how to create an agent that can store and recall its full conversation history across multiple interactions using a Dapr state store. By persisting the session history, the agent can continue a multi-turn dialog and provide answers informed by prior messages. ```bash -dapr run --app-id agent-memory --resources-path resources -- python 05_agent_memory.py +dapr run --app-id agent-memory --resources-path components-- python 05_agent_memory.py ``` ## Expected Behavior @@ -188,7 +188,7 @@ The script runs two prompts in sequence: the agent answers the initial weather q This example turns the previous agent into a durable agent backed by the Dapr Workflow engine. Instead of running interactions in-process, every step of the agent’s execution is persisted to durable storage, allowing long-running interactions to survive interruptions. The agent exposes an HTTP endpoint to start a new workflow and provides a way to query progress or retrieve the final result at any time. ```bash -dapr run --app-id durable-agent --resources-path resources -- python 06_durable_agent_http.py +dapr run --app-id durable-agent --resources-path components-- python 06_durable_agent_http.py ``` On a different terminal, trigger the agent: @@ -247,7 +247,7 @@ This example takes the same durable agent behavior from the previous example, bu The agent code remains unchanged; only the AgentRunner configuration switches from REST to pub/sub. ```bash -dapr run --app-id durable-agent-subscriber --resources-path resources --dapr-http-port 3500 -- python 07_durable_agent_pubsub.py +dapr run --app-id durable-agent-subscriber --resources-path components--dapr-http-port 3500 -- python 07_durable_agent_pubsub.py ``` On a different terminal, publish a message to the subscribed topic: @@ -277,7 +277,7 @@ Try publishing multiple messages to the topic and observe the agent process each This example does not use an agent. Instead, it demonstrates how to create a Dapr workflow that performs LLM calls in a deterministic, durable sequence. ```bash -dapr run --app-id workflow-llms --resources-path resources -- python 08_workflow_llm.py +dapr run --app-id workflow-llms --resources-path components -- python 08_workflow_llm.py ``` ## Expected Behavior @@ -286,8 +286,8 @@ The workflow generates a short outline for the given topic using an LLM, then us ## How This Works -1. The workflow first performs an LLM-backed activity that generates an outline from the topic. This activity is decorated with `@llm_activity`, a Dapr Agents annotation that simplifies workflow activities by automatically wiring in the LLM client and performing the model invocation for you. -2. The resulting outline is passed to a second `@llm_activity`-decorated activity, which uses the LLM to generate the final blog post. This output is returned as the result of the workflow. +1. The workflow first performs an LLM-backed activity that generates an outline from the topic. This activity uses a direct LLM call, optionally with schema validation, for predictable and validated output. +2. The resulting outline is passed to a second LLM-backed activity, which uses the LLM to generate the final blog post. This output is returned as the result of the workflow. ## How to Extend This Example * Modify the workflow to include additional activities that do not interact with LLMs, such as inserting validation steps, transformations, or business logic between LLM activities. @@ -300,7 +300,7 @@ The workflow generates a short outline for the given topic using an LLM, then us This example shows how a workflow can invoke entire agents as workflow activities, allowing you to orchestrate multi-step agent reasoning in a durable and deterministic way. Unlike previous examples where activities called LLMs directly, this workflow delegates each step to an agent with tools and memory, while the workflow engine provides durability and reliable progression. ```bash -dapr run --app-id workflow-agents --resources-path resources -- python 09_workflow_agents.py +dapr run --app-id workflow-agents --resources-path components -- python 09_workflow_agents.py ``` ## Expected Behavior @@ -309,7 +309,7 @@ When the workflow runs, it first delegates the request to a triage agent, which ## How This Works -1. The workflow invokes each agent using activities decorated with @agent_activity, which handles calling the agent and returning structured output. +1. The workflow invokes each agent by calling agent-backed activities as child workflows using `ctx.call_child_workflow`, which handles calling the agent and returning structured output. 2. The triage activity runs first, producing a summary based on customer data and the issue description. 3. The output of the triage agent is passed into the expert agent activity to generate the final recommendation. 4. Although agents can use tools and maintain their own memory, the workflow execution is what provides durability: if interrupted, it restarts from the last completed step. @@ -339,7 +339,7 @@ docker run -d -p 9411:9411 openzipkin/zipkin Now run the durable agent with tracing enabled and prompting included: ``` -dapr run --app-id durable-agent-trace --resources-path resources -- python 10_durable_agent_tracing.py +dapr run --app-id durable-agent-trace --resources-path components-- python 10_durable_agent_tracing.py ``` ## Expected Behavior diff --git a/quickstarts/01-dapr-agents-fundamentals/resources/conversation-statestore.yaml b/quickstarts/01-dapr-agents-fundamentals/components/agent-memory.yaml similarity index 86% rename from quickstarts/01-dapr-agents-fundamentals/resources/conversation-statestore.yaml rename to quickstarts/01-dapr-agents-fundamentals/components/agent-memory.yaml index b4d00f9d4..067ee993b 100644 --- a/quickstarts/01-dapr-agents-fundamentals/resources/conversation-statestore.yaml +++ b/quickstarts/01-dapr-agents-fundamentals/components/agent-memory.yaml @@ -1,7 +1,7 @@ apiVersion: dapr.io/v1alpha1 kind: Component metadata: - name: conversation-statestore + name: agent-memory spec: type: state.redis version: v1 diff --git a/quickstarts/01-dapr-agents-fundamentals/resources/message-pubsub.yaml b/quickstarts/01-dapr-agents-fundamentals/components/agent-pubsub.yaml similarity index 89% rename from quickstarts/01-dapr-agents-fundamentals/resources/message-pubsub.yaml rename to quickstarts/01-dapr-agents-fundamentals/components/agent-pubsub.yaml index 3990a6ecb..d2d800cac 100644 --- a/quickstarts/01-dapr-agents-fundamentals/resources/message-pubsub.yaml +++ b/quickstarts/01-dapr-agents-fundamentals/components/agent-pubsub.yaml @@ -1,7 +1,7 @@ apiVersion: dapr.io/v1alpha1 kind: Component metadata: - name: message-pubsub + name: agent-pubsub spec: type: pubsub.redis version: v1 diff --git a/quickstarts/01-dapr-agents-fundamentals/resources/agent-registry.yaml b/quickstarts/01-dapr-agents-fundamentals/components/agent-registry.yaml similarity index 100% rename from quickstarts/01-dapr-agents-fundamentals/resources/agent-registry.yaml rename to quickstarts/01-dapr-agents-fundamentals/components/agent-registry.yaml diff --git a/quickstarts/01-dapr-agents-fundamentals/components/agent-runtime.yaml b/quickstarts/01-dapr-agents-fundamentals/components/agent-runtime.yaml new file mode 100644 index 000000000..35ece252e --- /dev/null +++ b/quickstarts/01-dapr-agents-fundamentals/components/agent-runtime.yaml @@ -0,0 +1,16 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: agent-runtimestatestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: enableTLS + value: "false" + - name: keyPrefix + value: none \ No newline at end of file diff --git a/quickstarts/01-dapr-agents-fundamentals/resources/workflow-statestore.yaml b/quickstarts/01-dapr-agents-fundamentals/components/agent-workflow.yaml similarity index 89% rename from quickstarts/01-dapr-agents-fundamentals/resources/workflow-statestore.yaml rename to quickstarts/01-dapr-agents-fundamentals/components/agent-workflow.yaml index c927c9e29..c7f03f193 100644 --- a/quickstarts/01-dapr-agents-fundamentals/resources/workflow-statestore.yaml +++ b/quickstarts/01-dapr-agents-fundamentals/components/agent-workflow.yaml @@ -1,7 +1,7 @@ apiVersion: dapr.io/v1alpha1 kind: Component metadata: - name: workflow-statestore + name: agent-workflow spec: type: state.redis version: v1 diff --git a/quickstarts/01-dapr-agents-fundamentals/resources/llm-provider.yaml b/quickstarts/01-dapr-agents-fundamentals/components/llm-provider.yaml similarity index 100% rename from quickstarts/01-dapr-agents-fundamentals/resources/llm-provider.yaml rename to quickstarts/01-dapr-agents-fundamentals/components/llm-provider.yaml diff --git a/quickstarts/04-agent-based-workflows/01_sequential_workflow.py b/quickstarts/04-agent-based-workflows/01_sequential_workflow.py index 36f09cd4e..6822e66ff 100644 --- a/quickstarts/04-agent-based-workflows/01_sequential_workflow.py +++ b/quickstarts/04-agent-based-workflows/01_sequential_workflow.py @@ -5,98 +5,32 @@ import dapr.ext.workflow as wf from dapr.ext.workflow import DaprWorkflowContext -from dotenv import load_dotenv -from dapr_agents import Agent -from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import agent_activity - -# ----------------------------------------------------------------------------- -# Setup -# ----------------------------------------------------------------------------- -load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) runtime = wf.WorkflowRuntime() -llm = DaprChatClient(component_name="openai") - -# ----------------------------------------------------------------------------- -# Agents -# ----------------------------------------------------------------------------- -extractor = Agent( - name="DestinationExtractor", - role="Extract destination", - instructions=[ - "Extract the main city from the user's message.", - "Return only the city name, nothing else.", - ], - llm=llm, -) - -planner = Agent( - name="PlannerAgent", - role="Trip planner", - instructions=[ - "Create a concise 3-day outline for the given destination.", - "Balance culture, food, and leisure activities.", - ], - llm=llm, -) - -expander = Agent( - name="ItineraryAgent", - role="Itinerary expander", - llm=llm, - instructions=[ - "Expand a 3-day outline into a detailed itinerary.", - "Include Morning, Afternoon, and Evening sections each day.", - ], -) - - -# ----------------------------------------------------------------------------- -# Workflow -# ----------------------------------------------------------------------------- @runtime.workflow(name="chained_planner_workflow") def chained_planner_workflow(ctx: DaprWorkflowContext, user_msg: str) -> str: """Plan a 3-day trip using chained agent activities.""" - dest = yield ctx.call_activity(extract_destination, input=user_msg) - outline = yield ctx.call_activity(plan_outline, input=dest["content"]) - itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"]) - return itinerary["content"] - - -# ----------------------------------------------------------------------------- -# Activities (no explicit params, no prompts) -# ----------------------------------------------------------------------------- - - -@runtime.activity(name="extract_destination") -@agent_activity(agent=extractor) -def extract_destination(ctx) -> dict: - """Extract destination city.""" - pass - - -@runtime.activity(name="plan_outline") -@agent_activity(agent=planner) -def plan_outline(ctx) -> dict: - """Generate a 3-day outline for the destination.""" - pass - - -@runtime.activity(name="expand_itinerary") -@agent_activity(agent=expander) -def expand_itinerary(ctx) -> dict: - """Expand the outline into a full detailed itinerary.""" - pass - + dest = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": user_msg}, + app_id="extractor", + ) + outline = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": dest.get("content")}, + app_id="planner", + ) + itinerary = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": outline.get("content")}, + app_id="expander", + ) + return itinerary.get("content") -# ----------------------------------------------------------------------------- -# Entrypoint -# ----------------------------------------------------------------------------- if __name__ == "__main__": runtime.start() diff --git a/quickstarts/04-agent-based-workflows/README.md b/quickstarts/04-agent-based-workflows/README.md index 5532c1f18..8b9e2408d 100644 --- a/quickstarts/04-agent-based-workflows/README.md +++ b/quickstarts/04-agent-based-workflows/README.md @@ -1,6 +1,6 @@ # Agent-based Workflow Patterns -This quickstart demonstrates how to orchestrate agentic tasks using Dapr Workflows and the `@agent_activity` decorator from Dapr Agents. You’ll learn how to compose multi-step workflows that call autonomous agents—each powered by LLMs—for reasoning, decision-making, and task execution. +This quickstart demonstrates how to orchestrate agentic tasks using Dapr Workflows and agent-backed activities. You’ll learn how to compose multi-step workflows that call autonomous agents—each powered by LLMs—for reasoning, decision-making, and task execution. ## Prerequisites @@ -130,8 +130,7 @@ How It Works #### Code Highlights -* `@agent_activity` decorator: Wraps an activity function so that Dapr automatically delegates its implementation to an Agent. -The function body can remain empty (pass); execution is routed through the agent’s reasoning loop. +* Agent-backed activities: Use `ctx.call_child_workflow` to delegate execution to an Agent. The workflow coordinates agent tasks by calling each agent as a child workflow. * Agents: Each agent defines: * name, role, and instructions * a shared llm client (DaprChatClient) diff --git a/quickstarts/04-agent-based-workflows/expander_agent.py b/quickstarts/04-agent-based-workflows/expander_agent.py new file mode 100644 index 000000000..c4ad76260 --- /dev/null +++ b/quickstarts/04-agent-based-workflows/expander_agent.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import logging + +from dotenv import load_dotenv + +from dapr_agents.workflow.runners.agent import AgentRunner +from dapr_agents.agents.durable import DurableAgent +from dapr_agents.llm.dapr import DaprChatClient + +load_dotenv() +logging.basicConfig(level=logging.INFO) +llm = DaprChatClient(component_name="openai") + + +def main(): + expander = DurableAgent( + name="ItineraryAgent", + role="Itinerary expander", + llm=llm, + instructions=[ + "Expand a 3-day outline into a detailed itinerary.", + "Include Morning, Afternoon, and Evening sections each day.", + ], + ) + + runner = AgentRunner() + try: + runner.serve(expander, port=8003) + finally: + runner.shutdown(expander) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down agent...") diff --git a/quickstarts/04-agent-based-workflows/extractor_agent.py b/quickstarts/04-agent-based-workflows/extractor_agent.py new file mode 100644 index 000000000..a26f3d330 --- /dev/null +++ b/quickstarts/04-agent-based-workflows/extractor_agent.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import logging + +from dotenv import load_dotenv + +from dapr_agents.workflow.runners.agent import AgentRunner +from dapr_agents.agents.durable import DurableAgent +from dapr_agents.llm.dapr import DaprChatClient + +load_dotenv() +logging.basicConfig(level=logging.INFO) +llm = DaprChatClient(component_name="openai") + + +def main(): + extractor = DurableAgent( + name="DestinationExtractor", + role="Extract destination", + instructions=[ + "Extract the main city from the user's message.", + "Return only the city name, nothing else.", + ], + llm=llm, + ) + + runner = AgentRunner() + try: + runner.serve(extractor, port=8001) + finally: + runner.shutdown(extractor) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down agent...") diff --git a/quickstarts/04-agent-based-workflows/planner_agent.py b/quickstarts/04-agent-based-workflows/planner_agent.py new file mode 100644 index 000000000..82307f859 --- /dev/null +++ b/quickstarts/04-agent-based-workflows/planner_agent.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import logging + +from dotenv import load_dotenv + +from dapr_agents.workflow.runners.agent import AgentRunner +from dapr_agents.agents.durable import DurableAgent +from dapr_agents.llm.dapr import DaprChatClient + +load_dotenv() +logging.basicConfig(level=logging.INFO) +llm = DaprChatClient(component_name="openai") + + +def main(): + planner = DurableAgent( + name="PlannerAgent", + role="Trip planner", + instructions=[ + "Create a concise 3-day outline for the given destination.", + "Balance culture, food, and leisure activities.", + ], + llm=llm, + ) + + runner = AgentRunner() + try: + runner.serve(planner, port=8002) + finally: + runner.shutdown(planner) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down agent...") diff --git a/quickstarts/04-agent-based-workflows/sequential.yaml b/quickstarts/04-agent-based-workflows/sequential.yaml new file mode 100644 index 000000000..4b8fd195c --- /dev/null +++ b/quickstarts/04-agent-based-workflows/sequential.yaml @@ -0,0 +1,27 @@ +# https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-template/#template-properties +version: 1 +common: + resourcesPath: ./components + logLevel: info + appLogDestination: console + daprdLogDestination: console + +apps: +- appID: extractor + appDirPath: ./ + command: ["python3", "extractor_agent.py"] + appPort: 8001 + +- appID: planner + appDirPath: ./ + command: ["python3", "planner_agent.py"] + appPort: 8002 + +- appID: expander + appDirPath: ./ + command: ["python3", "expander_agent.py"] + appPort: 8003 + +- appID: workflow + appDirPath: ./ + command: ["python3", "01_sequential_workflow.py"] diff --git a/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py b/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py index 2c77ddcb9..f5ff65047 100644 --- a/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py +++ b/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py @@ -5,7 +5,6 @@ from dotenv import load_dotenv from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import llm_activity # Load environment variables (e.g., API keys, secrets) load_dotenv() @@ -23,12 +22,8 @@ def single_task_workflow(ctx: DaprWorkflowContext, name: str): @runtime.activity(name="describe_person") -@llm_activity( - prompt="Who was {name}?", - llm=llm, -) -async def describe_person(ctx, name: str) -> str: - pass +def describe_person(ctx, name: str) -> str: + return str(llm.generate(prompt=f"Who was {name}?")) if __name__ == "__main__": diff --git a/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py b/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py index 0375c234d..440966c3e 100644 --- a/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py +++ b/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py @@ -6,7 +6,6 @@ from pydantic import BaseModel from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import llm_activity class Dog(BaseModel): @@ -31,20 +30,16 @@ def single_task_workflow_structured(ctx: DaprWorkflowContext, name: str): @runtime.activity(name="describe_dog") -@llm_activity( - prompt=""" -You are a JSON-only API. Return a Dog object for the dog named {name}." -JSON schema (informal): -{{ - "name": string, // Dog\'s full name - "bio": string, // 1-3 sentence biography - "breed": string // Primary breed or mixed -}} -""", - llm=llm, -) def describe_dog(ctx, name: str) -> Dog: - pass + result = llm.generate( + prompt=f"You are a JSON-only API. Return a Dog object for the dog named {name}.", + response_format=Dog, + ) + try: + dog = Dog.model_validate(result) + except Exception as e: + raise RuntimeError(f"LLM did not return a valid Dog: {e}") + return dog if __name__ == "__main__": diff --git a/quickstarts/04-llm-based-workflows/03_sequential_workflow.py b/quickstarts/04-llm-based-workflows/03_sequential_workflow.py index ffb2074d8..d574520e0 100644 --- a/quickstarts/04-llm-based-workflows/03_sequential_workflow.py +++ b/quickstarts/04-llm-based-workflows/03_sequential_workflow.py @@ -5,7 +5,6 @@ from dotenv import load_dotenv from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import llm_activity # Load environment variables (e.g., API keys, secrets) load_dotenv() @@ -28,27 +27,20 @@ def task_chain_workflow(ctx: DaprWorkflowContext): @runtime.activity(name="get_character") -@llm_activity( - prompt=""" +def get_character(ctx) -> str: + return str( + llm.generate( + prompt=""" Pick a random character from The Lord of the Rings. Respond with the character's name only. -""", - llm=llm, -) -def get_character(ctx) -> str: - # The llm_activity decorator handles the LLM call using the prompt above. - # Just declare the signature; the body can be empty or 'pass'. - pass +""" + ) + ) @runtime.activity(name="get_line") -@llm_activity( - prompt="What is a famous line by {character}?", - llm=llm, -) def get_line(ctx, character: str) -> str: - # The llm_activity decorator will format the prompt with 'character'. - pass + return str(llm.generate(prompt=f"What is a famous line by {character}?")) if __name__ == "__main__": diff --git a/quickstarts/04-llm-based-workflows/04_parallel_workflow.py b/quickstarts/04-llm-based-workflows/04_parallel_workflow.py index 45112af30..6228a5a73 100644 --- a/quickstarts/04-llm-based-workflows/04_parallel_workflow.py +++ b/quickstarts/04-llm-based-workflows/04_parallel_workflow.py @@ -8,7 +8,6 @@ from pydantic import BaseModel, Field from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import llm_activity # Load environment variables (API keys, etc.) load_dotenv() @@ -67,60 +66,35 @@ def research_workflow(ctx: DaprWorkflowContext, topic: str): # ----- Activities ----- -@runtime.activity(name="generate_questions") -@llm_activity( - prompt=""" -You are a research assistant. Generate exactly 3 focused research questions about the topic: {topic}. -Return ONLY a JSON object matching this schema (no prose): - -{{ - "questions": [ - {{ "text": "..." }}, - {{ "text": "..." }}, - {{ "text": "..." }} - ] -}} -""", - llm=llm, -) def generate_questions(ctx, topic: str) -> Questions: - # Implemented by llm_activity via the prompt above. - pass + result = llm.generate( + prompt=f"You are a research assistant. Generate exactly 3 focused research questions about the topic: {topic}. Return only a JSON object with a 'questions' list, each item having a 'text' field.", + response_format=Questions, + ) + try: + questions = Questions.model_validate(result) + except Exception as e: + raise RuntimeError(f"LLM did not return valid Questions: {e}") + return questions @runtime.activity(name="gather_information") -@llm_activity( - prompt=""" -Research the following question and provide a detailed, well-cited answer (paragraphs + bullet points where helpful). -Question: {question} -""", - llm=llm, -) def gather_information(ctx, question: str) -> str: - # Implemented by llm_activity via the prompt above. - pass + return str( + llm.generate( + prompt=f"Research the following question and provide a detailed, well-cited answer (paragraphs + bullet points where helpful).\nQuestion: {question}\n" + ) + ) @runtime.activity(name="synthesize_results") -@llm_activity( - prompt=""" -Create a comprehensive research report on the topic "{topic}" using the following research findings: - -{research_results} - -Requirements: -- Clear executive summary (3-5 sentences) -- Key findings (bulleted) -- Risks/unknowns -- Short conclusion - -Return plain text (no JSON). -""", - llm=llm, -) def synthesize_results(ctx, topic: str, research_results: List[str]) -> str: - # Implemented by llm_activity via the prompt above. - pass + return str( + llm.generate( + prompt=f""" +Create a comprehensive research report on the topic \"{topic}\" using the following research findings:\n\n{research_results}\n\nRequirements:\n- Clear executive summary (3-5 sentences)\n- Key findings (bulleted)\n- Risks/unknowns\n- Short conclusion\n\nReturn plain text (no JSON).\n""" + ) + ) # ----- Entrypoint ----- diff --git a/quickstarts/04-llm-based-workflows/README.md b/quickstarts/04-llm-based-workflows/README.md index 5df5b8ffe..64d6aa1d3 100644 --- a/quickstarts/04-llm-based-workflows/README.md +++ b/quickstarts/04-llm-based-workflows/README.md @@ -1,7 +1,7 @@ # LLM-based Workflow Patterns -This quickstart demonstrates how to orchestrate sequential and parallel tasks using Dapr Agents' workflow capabilities powered by Language Models (LLMs). You'll learn how to build resilient, stateful workflows that leverage LLMs for reasoning, structured output, and automation, all using the new `@llm_activity` decorator and native Dapr workflow runtime. +This quickstart demonstrates how to orchestrate sequential and parallel tasks using Dapr Agents' workflow capabilities powered by Language Models (LLMs). You'll learn how to build resilient, stateful workflows that leverage LLMs for reasoning, structured output, and automation, using direct LLM calls with schema validation and the native Dapr workflow runtime. ## Prerequisites @@ -112,7 +112,7 @@ dapr run --app-id dapr-agent-wf-single --resources-path $temp_resources_folder - **Why start here?** - Shows how to define a workflow + activity with `WorkflowRuntime` -- Demonstrates `@llm_activity` returning plain text +- Demonstrates direct LLM activity returning plain text - Uses `DaprWorkflowClient` to schedule and await a single run ### 2. Single LLM Activity (Structured Output) @@ -124,7 +124,7 @@ dapr run --app-id dapr-agent-wf-structured --resources-path $temp_resources_fold ``` **Key ideas** -- `@llm_activity` can deserialize into typed models (e.g., `Dog`) +- Activities can deserialize LLM output into typed models (e.g., `Dog`) using response_format and schema validation - Perfect for downstream steps that expect strongly typed data ### 3. Sequential Task Execution diff --git a/quickstarts/04-message-router-workflow/README.md b/quickstarts/04-message-router-workflow/README.md index d9ac457d5..18f3dff2f 100644 --- a/quickstarts/04-message-router-workflow/README.md +++ b/quickstarts/04-message-router-workflow/README.md @@ -1,6 +1,6 @@ # Message Router Workflow (Pub/Sub → Workflow) -This quickstart shows how to trigger a Dapr Workflow directly from a Pub/Sub message using the `@message_router` decorator. The decorator is applied to the workflow itself, enabling automatic message validation and workflow scheduling. Activities use the `@llm_activity` decorator to offload work to an LLM. +This quickstart shows how to trigger a Dapr Workflow directly from a Pub/Sub message using the `@message_router` decorator. The decorator is applied to the workflow itself, enabling automatic message validation and workflow scheduling. Activities use direct LLM calls (with optional schema validation) to offload work to an LLM. You'll run two processes: @@ -117,7 +117,7 @@ spec: 04-message-router-workflow/ ├─ components/ # Dapr components (pubsub, conversation, workflow state) ├─ app.py # Starts WorkflowRuntime + registers message router -├─ workflow.py # @message_router decorated workflow & @llm_activity activities +├─ workflow.py # @message_router decorated workflow & direct LLM activities └─ message_client.py # publishes a test message to the topic ``` @@ -126,7 +126,7 @@ spec: * `message_client.py` publishes a CloudEvent-style JSON payload to `topic=blog.requests` on `pubsub=messagepubsub`. * `app.py` starts the Dapr Workflow runtime, registers `blog_workflow` + activities, and calls `register_message_routes(targets=[blog_workflow])`. * `register_message_routes` discovers the `@message_router` decorator on `blog_workflow`, validates incoming messages using the Pydantic model (`StartBlogMessage`), and automatically schedules the workflow when valid messages arrive. -* `workflow.py` runs `blog_workflow`, calling two LLM-backed activities (`create_outline`, `write_post`) decorated with `@llm_activity`. +* `workflow.py` runs `blog_workflow`, calling two LLM-backed activities (`create_outline`, `write_post`) that use direct LLM calls (with optional schema validation). ## Code Structure diff --git a/quickstarts/04-message-router-workflow/workflow.py b/quickstarts/04-message-router-workflow/workflow.py index 7a420b63d..0826e1ac3 100644 --- a/quickstarts/04-message-router-workflow/workflow.py +++ b/quickstarts/04-message-router-workflow/workflow.py @@ -5,8 +5,7 @@ from pydantic import BaseModel, Field from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import llm_activity -from dapr_agents.workflow.decorators.routers import message_router +from dapr_agents.workflow.decorators.decorators import message_router load_dotenv() @@ -33,19 +32,17 @@ def blog_workflow(ctx: DaprWorkflowContext, wf_input: dict) -> str: return post -@llm_activity( - prompt="Create a short outline about {topic}. Output 3-5 bullet points.", - llm=llm, -) async def create_outline(ctx, topic: str) -> str: - # Implemented by the decorator; body can be empty. - pass + return str( + llm.generate( + prompt=f"Create a short outline about {topic}. Output 3-5 bullet points." + ) + ) -@llm_activity( - prompt="Write a short blog post following this outline:\n{outline}", - llm=llm, -) async def write_post(ctx, outline: str) -> str: - # Implemented by the decorator; body can be empty. - pass + return str( + llm.generate( + prompt=f"Write a short blog post following this outline:\n{outline}" + ) + ) diff --git a/quickstarts/09-agent-observability/01_agent_zipkin.py b/quickstarts/09-agent-observability/01_agent_zipkin.py deleted file mode 100644 index d90c81c61..000000000 --- a/quickstarts/09-agent-observability/01_agent_zipkin.py +++ /dev/null @@ -1,62 +0,0 @@ -import asyncio -from dapr_agents import tool, Agent, OpenAIChatClient -from dotenv import load_dotenv - -load_dotenv() - - -@tool -def my_weather_func() -> str: - """Get current weather.""" - return "It's 72°F and sunny" - - -async def main(): - weather_agent = Agent( - name="WeatherAgent", - role="Weather Assistant", - instructions=["Help users with weather information"], - tools=[my_weather_func], - llm=OpenAIChatClient(model="gpt-3.5-turbo"), - ) - - from opentelemetry import trace - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.exporter.zipkin.json import ZipkinExporter - from opentelemetry.sdk.resources import Resource - from dapr_agents.observability import DaprAgentsInstrumentor - - # Define the service name using a Resource - resource = Resource(attributes={"service.name": "dapr-weather-agents"}) - - # Set up the OpenTelemetry TracerProvider with the resource - tracer_provider = TracerProvider(resource=resource) - - # Configure the Zipkin exporter (no service_name argument here) - zipkin_exporter = ZipkinExporter( - endpoint="http://localhost:9411/api/v2/spans" # default Zipkin endpoint - ) - - # Attach the exporter to the tracer provider - span_processor = BatchSpanProcessor(zipkin_exporter) - tracer_provider.add_span_processor(span_processor) - - # Register the tracer provider globally - trace.set_tracer_provider(tracer_provider) - - # Instrument Dapr Agents - instrumentor = DaprAgentsInstrumentor() - instrumentor.instrument(tracer_provider=tracer_provider) - - # Run the agent - try: - await weather_agent.run( - "What is the weather in Virginia, New York and Washington DC?" - ) - except Exception as e: - print(f"Error: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/quickstarts/09-agent-observability/02_agent_otel.py b/quickstarts/09-agent-observability/02_agent_otel.py deleted file mode 100644 index 595cd0cb5..000000000 --- a/quickstarts/09-agent-observability/02_agent_otel.py +++ /dev/null @@ -1,60 +0,0 @@ -import asyncio -from dapr_agents import tool, Agent, OpenAIChatClient -from dotenv import load_dotenv - -load_dotenv() - - -@tool -def my_weather_func() -> str: - """Get current weather.""" - return "It's 72°F and sunny" - - -async def main(): - weather_agent = Agent( - name="WeatherAgent", - role="Weather Assistant", - instructions=["Help users with weather information"], - tools=[my_weather_func], - llm=OpenAIChatClient(model="gpt-3.5-turbo"), - ) - - from opentelemetry import trace - from opentelemetry.sdk.resources import Resource - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter - from dapr_agents.observability import DaprAgentsInstrumentor - - # Define the service name in the resource - resource = Resource(attributes={"service.name": "dapr-weather-agents"}) - - # Set up TracerProvider with resource - tracer_provider = TracerProvider(resource=resource) - - # Set up OTLP exporter (in this example working with HTTP to send traces to Jaeger) - otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces") - - # Set up span processor and add to tracer provider - span_processor = BatchSpanProcessor(otlp_exporter) - tracer_provider.add_span_processor(span_processor) - - # Register tracer provider globally - trace.set_tracer_provider(tracer_provider) - - # Instrument Dapr Agents - instrumentor = DaprAgentsInstrumentor() - instrumentor.instrument(tracer_provider=tracer_provider) - - # Run the agent - try: - await weather_agent.run( - "What is the weather in Virginia, New York and Washington DC?" - ) - except Exception as e: - print(f"Error: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/quickstarts/09-agent-observability/README.md b/quickstarts/09-agent-observability/README.md deleted file mode 100644 index 5ffbc2064..000000000 --- a/quickstarts/09-agent-observability/README.md +++ /dev/null @@ -1,89 +0,0 @@ -# Hello World with Dapr Agents and Tracing - -This quickstart provides a hands-on introduction to setting up full end-to-end tracing with Dapr Agents. - -## Prerequisites - -- uv package manager -- OpenAI API key -- Zipkin -- Jaeger - -## Environment Setup - -### Option 1: Using pip (Recommended) - -```bash -uv venv -# Activate the virtual environment -# On Windows: -.venv\Scripts\activate -# On macOS/Linux: -source .venv/bin/activate -uv sync --active -``` - -### Option 2: Using uv - -```bash -# Create and activate virtual environment -uv venv .venv -source .venv/bin/activate - -# Install core dependencies -uv pip install -r requirements.txt -``` - -## Configuration - -Create a `.env` file in the project root: - -```env -OPENAI_API_KEY=your_api_key_here -``` - -Replace `your_api_key_here` with your actual OpenAI API key. - -## Examples - -### 1. Simple Agent with Zipkin tracing - -In this example, we'll run a simple agent that uses Zipkin for distributed tracing. - -Run Zipkin locally: - -```bash -docker run --rm -d -p 9411:9411 --name zipkin openzipkin/zipkin -``` - -Run the agent example to see how to create an agent with custom tools: -```bash -python 01_agent_zipkin.py -``` - -**Expected output:** Visit `http://localhost:9411` in your browser and view the traces. - -![](./Standalone_Agent_zipkin.png) - -### 2. Simple Agent with OpenTelemetry tracing (Jaeger) - -In this example, we'll use Jaeger as the tracing backend with an HTTP `OTLPSpanExporter` generic span exporter. - -Run Jaeger locally: - -```bash -docker run -d -e COLLECTOR_OTLP_ENABLED=true -p 4318:4318 -p 16686:16686 jaegertracing/all-in-one:latest -``` - -Run the agent example to see how to create an agent with custom tools: -```bash -python 02_agent_otel.py -``` - -**Expected output:** Visit `http://localhost:16686` in your browser and view the traces. - -![](./Standalone_Agent_otel.png) - -## Next Steps - -After completing these examples, move on to the [Agents as Activities in Workflows with Observability](../09-agents-as-activities-observability/README.md) quickstart to learn how to add observability to workflow-based agent orchestration. diff --git a/quickstarts/09-agent-observability/Standalone_Agent_otel.png b/quickstarts/09-agent-observability/Standalone_Agent_otel.png deleted file mode 100644 index dcc408fd5..000000000 Binary files a/quickstarts/09-agent-observability/Standalone_Agent_otel.png and /dev/null differ diff --git a/quickstarts/09-agent-observability/Standalone_Agent_zipkin.png b/quickstarts/09-agent-observability/Standalone_Agent_zipkin.png deleted file mode 100644 index 60d2436fe..000000000 Binary files a/quickstarts/09-agent-observability/Standalone_Agent_zipkin.png and /dev/null differ diff --git a/quickstarts/09-agent-observability/pyproject.toml b/quickstarts/09-agent-observability/pyproject.toml deleted file mode 100644 index d3c7b9d1e..000000000 --- a/quickstarts/09-agent-observability/pyproject.toml +++ /dev/null @@ -1,11 +0,0 @@ -[project] -name = "09-agent-observability" -version = "0.1.0" -requires-python = ">=3.11, <3.14" -dependencies = [ - "dapr-agents", - "python-dotenv>=1.2.1", -] - -[tool.uv.sources] -dapr-agents = { workspace = true } diff --git a/quickstarts/09-agents-as-activities-observability/README.md b/quickstarts/09-agents-as-activities-observability/README.md index 252d37a85..bd2ee2835 100644 --- a/quickstarts/09-agents-as-activities-observability/README.md +++ b/quickstarts/09-agents-as-activities-observability/README.md @@ -130,7 +130,7 @@ spec: ## Sequential Agent Workflow -This example shows how to chain multiple agents inside a Dapr workflow using the `@agent_activity` decorator. Each activity runs an agent with its own instructions, while the workflow orchestrates the overall plan. +This example shows how to chain multiple agents inside a Dapr workflow by orchestrating agent-backed activities as child workflows. Each activity calls an agent with its own instructions using `ctx.call_child_workflow`, while the workflow orchestrates the overall plan. Run the workflow (render components first if you’re using `.env` placeholders): diff --git a/quickstarts/09-agents-as-activities-observability/expander_agent.py b/quickstarts/09-agents-as-activities-observability/expander_agent.py new file mode 100644 index 000000000..ed21f90a1 --- /dev/null +++ b/quickstarts/09-agents-as-activities-observability/expander_agent.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import logging + +from dotenv import load_dotenv + +from dapr_agents.agents.configs import AgentObservabilityConfig, AgentTracingExporter +from dapr_agents.workflow.runners.agent import AgentRunner +from dapr_agents.agents.durable import DurableAgent +from dapr_agents.llm.dapr import DaprChatClient + +load_dotenv() +logging.basicConfig(level=logging.INFO) +llm = DaprChatClient(component_name="openai") + + +def main(): + expander = DurableAgent( + name="ItineraryAgent", + role="Itinerary expander", + llm=llm, + instructions=[ + "Expand a 3-day outline into a detailed itinerary.", + "Include Morning, Afternoon, and Evening sections each day.", + ], + agent_observability=AgentObservabilityConfig( + enabled=True, + tracing_enabled=True, + tracing_exporter=AgentTracingExporter.ZIPKIN, + endpoint="http://localhost:9411/api/v2/spans", + ), + ) + + runner = AgentRunner() + try: + runner.serve(expander, port=8003) + finally: + runner.shutdown(expander) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down agent...") diff --git a/quickstarts/09-agents-as-activities-observability/extractor_agent.py b/quickstarts/09-agents-as-activities-observability/extractor_agent.py new file mode 100644 index 000000000..451725244 --- /dev/null +++ b/quickstarts/09-agents-as-activities-observability/extractor_agent.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import logging + +from dotenv import load_dotenv + +from dapr_agents.agents.configs import AgentObservabilityConfig, AgentTracingExporter +from dapr_agents.workflow.runners.agent import AgentRunner +from dapr_agents.agents.durable import DurableAgent +from dapr_agents.llm.dapr import DaprChatClient + +load_dotenv() +logging.basicConfig(level=logging.INFO) +llm = DaprChatClient(component_name="openai") + + +def main(): + extractor = DurableAgent( + name="DestinationExtractor", + role="Extract destination", + instructions=[ + "Extract the main city from the user's message.", + "Return only the city name, nothing else.", + ], + llm=llm, + agent_observability=AgentObservabilityConfig( + enabled=True, + tracing_enabled=True, + tracing_exporter=AgentTracingExporter.ZIPKIN, + endpoint="http://localhost:9411/api/v2/spans", + ), + ) + + runner = AgentRunner() + try: + runner.serve(extractor, port=8001) + finally: + runner.shutdown(extractor) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down agent...") diff --git a/quickstarts/09-agents-as-activities-observability/planner_agent.py b/quickstarts/09-agents-as-activities-observability/planner_agent.py new file mode 100644 index 000000000..164599055 --- /dev/null +++ b/quickstarts/09-agents-as-activities-observability/planner_agent.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import logging + +from dotenv import load_dotenv + +from dapr_agents.agents.configs import AgentObservabilityConfig, AgentTracingExporter +from dapr_agents.workflow.runners.agent import AgentRunner +from dapr_agents.agents.durable import DurableAgent +from dapr_agents.llm.dapr import DaprChatClient + +load_dotenv() +logging.basicConfig(level=logging.INFO) +llm = DaprChatClient(component_name="openai") + + +def main(): + planner = DurableAgent( + name="PlannerAgent", + role="Trip planner", + instructions=[ + "Create a concise 3-day outline for the given destination.", + "Balance culture, food, and leisure activities.", + ], + llm=llm, + agent_observability=AgentObservabilityConfig( + enabled=True, + tracing_enabled=True, + tracing_exporter=AgentTracingExporter.ZIPKIN, + endpoint="http://localhost:9411/api/v2/spans", + ), + ) + + runner = AgentRunner() + try: + runner.serve(planner, port=8002) + finally: + runner.shutdown(planner) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Shutting down agent...") diff --git a/quickstarts/09-agents-as-activities-observability/pyproject.toml b/quickstarts/09-agents-as-activities-observability/pyproject.toml index d602ddcd2..e9d753585 100644 --- a/quickstarts/09-agents-as-activities-observability/pyproject.toml +++ b/quickstarts/09-agents-as-activities-observability/pyproject.toml @@ -3,7 +3,6 @@ name = "09-agents-as-activities-observability" version = "0.1.0" requires-python = ">=3.11, <3.14" dependencies = [ - "arize-phoenix>=12.16.0", "dapr-agents", "python-dotenv>=1.2.1", ] diff --git a/quickstarts/09-agents-as-activities-observability/sequential.yaml b/quickstarts/09-agents-as-activities-observability/sequential.yaml new file mode 100644 index 000000000..372484f46 --- /dev/null +++ b/quickstarts/09-agents-as-activities-observability/sequential.yaml @@ -0,0 +1,27 @@ +# https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-template/#template-properties +version: 1 +common: + resourcesPath: ./components + logLevel: info + appLogDestination: console + daprdLogDestination: console + +apps: +- appID: extractor + appDirPath: ./ + command: ["python3", "extractor_agent.py"] + appPort: 8001 + +- appID: planner + appDirPath: ./ + command: ["python3", "planner_agent.py"] + appPort: 8002 + +- appID: expander + appDirPath: ./ + command: ["python3", "expander_agent.py"] + appPort: 8003 + +- appID: workflow + appDirPath: ./ + command: ["python3", "sequential_workflow_tracing.py"] diff --git a/quickstarts/09-agents-as-activities-observability/sequential_workflow.py b/quickstarts/09-agents-as-activities-observability/sequential_workflow.py deleted file mode 100644 index 5f6ffcafb..000000000 --- a/quickstarts/09-agents-as-activities-observability/sequential_workflow.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import logging -import time - -import dapr.ext.workflow as wf -from dapr.ext.workflow import DaprWorkflowContext -from dotenv import load_dotenv - -from dapr_agents import Agent -from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.workflow.decorators import agent_activity - -# ----------------------------------------------------------------------------- -# Setup -# ----------------------------------------------------------------------------- -load_dotenv() -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -runtime = wf.WorkflowRuntime() -llm = DaprChatClient(component_name="openai") - - -# ----------------------------------------------------------------------------- -# Agents -# ----------------------------------------------------------------------------- -extractor = Agent( - name="DestinationExtractor", - role="Extract destination", - instructions=[ - "Extract the main city from the user's message.", - "Return only the city name, nothing else.", - ], - llm=llm, -) - -planner = Agent( - name="PlannerAgent", - role="Trip planner", - instructions=[ - "Create a concise 3-day outline for the given destination.", - "Balance culture, food, and leisure activities.", - ], - llm=llm, -) - -expander = Agent( - name="ItineraryAgent", - role="Itinerary expander", - llm=llm, - instructions=[ - "Expand a 3-day outline into a detailed itinerary.", - "Include Morning, Afternoon, and Evening sections each day.", - ], -) - - -# ----------------------------------------------------------------------------- -# Workflow definition -# ----------------------------------------------------------------------------- -@runtime.workflow(name="chained_planner_workflow") -def chained_planner_workflow(ctx: DaprWorkflowContext, user_msg: str) -> str: - dest = yield ctx.call_activity(extract_destination, input=user_msg) - outline = yield ctx.call_activity(plan_outline, input=dest["content"]) - itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"]) - return itinerary["content"] - - -# ----------------------------------------------------------------------------- -# Activities backed by agents -# ----------------------------------------------------------------------------- -@runtime.activity(name="extract_destination") -@agent_activity(agent=extractor) -def extract_destination(ctx) -> dict: - pass - - -@runtime.activity(name="plan_outline") -@agent_activity(agent=planner) -def plan_outline(ctx) -> dict: - pass - - -@runtime.activity(name="expand_itinerary") -@agent_activity(agent=expander) -def expand_itinerary(ctx) -> dict: - pass - - -# ----------------------------------------------------------------------------- -# Entrypoint -# ----------------------------------------------------------------------------- -if __name__ == "__main__": - runtime.start() - time.sleep(5) - - client = wf.DaprWorkflowClient() - user_input = "Plan a trip to Paris." - - logger.info("Starting workflow: %s", user_input) - instance_id = client.schedule_new_workflow( - workflow=chained_planner_workflow, - input=user_input, - ) - - logger.info("Workflow started: %s", instance_id) - state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60) - - if not state: - logger.error("No state returned (instance may not exist).") - elif state.runtime_status.name == "COMPLETED": - logger.info("Trip Itinerary:\n%s", state.serialized_output) - else: - logger.error("Workflow ended with status: %s", state.runtime_status) - if state.failure_details: - fd = state.failure_details - logger.error("Failure type: %s", fd.error_type) - logger.error("Failure message: %s", fd.message) - logger.error("Stack trace:\n%s", fd.stack_trace) - else: - logger.error("Custom status: %s", state.serialized_custom_status) - - runtime.shutdown() diff --git a/quickstarts/09-agents-as-activities-observability/sequential_workflow_multi_model_tracing.py b/quickstarts/09-agents-as-activities-observability/sequential_workflow_multi_model_tracing.py deleted file mode 100644 index eae181857..000000000 --- a/quickstarts/09-agents-as-activities-observability/sequential_workflow_multi_model_tracing.py +++ /dev/null @@ -1,115 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import annotations - -import logging -import time - -import dapr.ext.workflow as wf -from dapr.ext.workflow import DaprWorkflowContext -from dotenv import load_dotenv -from phoenix.otel import register - -from dapr_agents import Agent -from dapr_agents.llm.huggingface import HFHubChatClient -from dapr_agents.llm.nvidia import NVIDIAChatClient -from dapr_agents.llm.openai import OpenAIChatClient -from dapr_agents.observability import DaprAgentsInstrumentor -from dapr_agents.workflow.decorators import agent_activity - -load_dotenv() -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -tracer_provider = register( - project_name="dapr-weather-agents", - protocol="http/protobuf", -) -instrumentor = DaprAgentsInstrumentor() -instrumentor.instrument(tracer_provider=tracer_provider) - -runtime = wf.WorkflowRuntime() - -openai_llm = OpenAIChatClient(model="gpt-4o-mini") -nvidia_llm = NVIDIAChatClient(model="meta/llama-3.1-8b-instruct") -hf_llm = HFHubChatClient(model="HuggingFaceTB/SmolLM3-3B") - - -extractor = Agent( - name="DestinationExtractor", - role="Extract destination", - instructions=["Extract the main city from the user query"], - llm=openai_llm, -) - -planner = Agent( - name="PlannerAgent", - role="Outline planner", - instructions=["Generate a 3-day outline for the destination"], - llm=nvidia_llm, -) - -expander = Agent( - name="ItineraryAgent", - role="Itinerary expander", - instructions=["Expand the outline into a detailed plan"], - llm=hf_llm, -) - - -@runtime.workflow(name="chained_planner_workflow") -def chained_planner_workflow(ctx: DaprWorkflowContext, user_msg: str) -> str: - dest = yield ctx.call_activity(extract_destination, input=user_msg) - outline = yield ctx.call_activity(plan_outline, input=dest["content"]) - itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"]) - return itinerary["content"] - - -@runtime.activity(name="extract_destination") -@agent_activity(agent=extractor) -def extract_destination(ctx) -> dict: - pass - - -@runtime.activity(name="plan_outline") -@agent_activity(agent=planner) -def plan_outline(ctx) -> dict: - pass - - -@runtime.activity(name="expand_itinerary") -@agent_activity(agent=expander) -def expand_itinerary(ctx) -> dict: - pass - - -if __name__ == "__main__": - runtime.start() - time.sleep(5) - - client = wf.DaprWorkflowClient() - user_input = "Plan a trip to Paris." - - logger.info("Starting workflow: %s", user_input) - instance_id = client.schedule_new_workflow( - workflow=chained_planner_workflow, - input=user_input, - ) - - logger.info("Workflow started: %s", instance_id) - state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60) - - if not state: - logger.error("No state returned (instance may not exist).") - elif state.runtime_status.name == "COMPLETED": - logger.info("Trip Itinerary:\n%s", state.serialized_output) - else: - logger.error("Workflow ended with status: %s", state.runtime_status) - if state.failure_details: - fd = state.failure_details - logger.error("Failure type: %s", fd.error_type) - logger.error("Failure message: %s", fd.message) - logger.error("Stack trace:\n%s", fd.stack_trace) - else: - logger.error("Custom status: %s", state.serialized_custom_status) - - runtime.shutdown() diff --git a/quickstarts/09-agents-as-activities-observability/sequential_workflow_tracing.py b/quickstarts/09-agents-as-activities-observability/sequential_workflow_tracing.py index eaced233b..c931a3267 100644 --- a/quickstarts/09-agents-as-activities-observability/sequential_workflow_tracing.py +++ b/quickstarts/09-agents-as-activities-observability/sequential_workflow_tracing.py @@ -6,85 +6,32 @@ import dapr.ext.workflow as wf from dapr.ext.workflow import DaprWorkflowContext -from dotenv import load_dotenv -from phoenix.otel import register -from dapr_agents import Agent -from dapr_agents.llm.dapr import DaprChatClient -from dapr_agents.observability import DaprAgentsInstrumentor -from dapr_agents.workflow.decorators import agent_activity - -load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# OpenTelemetry instrumentation (Phoenix) -tracer_provider = register( - project_name="dapr-weather-agents", - protocol="http/protobuf", -) -instrumentor = DaprAgentsInstrumentor() -instrumentor.instrument(tracer_provider=tracer_provider) - runtime = wf.WorkflowRuntime() -llm = DaprChatClient(component_name="openai") - - -extractor = Agent( - name="DestinationExtractor", - role="Extract destination", - instructions=[ - "Extract the main city from the user's message.", - "Return only the city name, nothing else.", - ], - llm=llm, -) - -planner = Agent( - name="PlannerAgent", - role="Trip planner", - instructions=[ - "Create a concise 3-day outline for the given destination.", - "Balance culture, food, and leisure activities.", - ], - llm=llm, -) - -expander = Agent( - name="ItineraryAgent", - role="Itinerary expander", - llm=llm, - instructions=[ - "Expand a 3-day outline into a detailed itinerary.", - "Include Morning, Afternoon, and Evening sections each day.", - ], -) @runtime.workflow(name="chained_planner_workflow") def chained_planner_workflow(ctx: DaprWorkflowContext, user_msg: str) -> str: - dest = yield ctx.call_activity(extract_destination, input=user_msg) - outline = yield ctx.call_activity(plan_outline, input=dest["content"]) - itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"]) - return itinerary["content"] - - -@runtime.activity(name="extract_destination") -@agent_activity(agent=extractor) -def extract_destination(ctx) -> dict: - pass - - -@runtime.activity(name="plan_outline") -@agent_activity(agent=planner) -def plan_outline(ctx) -> dict: - pass - - -@runtime.activity(name="expand_itinerary") -@agent_activity(agent=expander) -def expand_itinerary(ctx) -> dict: - pass + """Plan a 3-day trip using chained agent activities.""" + dest = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": user_msg}, + app_id="extractor", + ) + outline = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": dest.get("content")}, + app_id="planner", + ) + itinerary = yield ctx.call_child_workflow( + workflow="agent_workflow", + input={"task": outline.get("content")}, + app_id="expander", + ) + return itinerary.get("content") if __name__ == "__main__": diff --git a/quickstarts/README.md b/quickstarts/README.md index 67dbd5a9f..9cf9af19f 100644 --- a/quickstarts/README.md +++ b/quickstarts/README.md @@ -94,7 +94,7 @@ This quickstart demonstrates how to build a weather assistant with durable, work ### LLM-based Workflow Patterns -Learn to orchestrate stateful, resilient workflows powered by Language Models (LLMs) using `@llm_activity` decorator. +Learn to orchestrate stateful, resilient workflows powered by Language Models (LLMs). - **LLM-powered Tasks**: Automate reasoning and decision-making in workflows - **Task Chaining**: Build multi-step processes with reliable state management @@ -106,7 +106,7 @@ This quickstart demonstrates how to design and run sequential and parallel workf ### Agent-based Workflow Patterns -Learn to orchestrate **autonomous, role-driven agents** inside Dapr Workflows using the `@agent_activity` decorator. +Learn to orchestrate **autonomous, role-driven agents** inside Dapr Workflows by calling agent-backed activities as child workflows. These patterns focus on chaining and coordinating specialized agents that reason, plan, and act within durable, stateful workflows. > Currently, this does not work with `DurableAgents`. diff --git a/tests/agents/durableagent/test_durable_agent.py b/tests/agents/durableagent/test_durable_agent.py index 953d70512..70921fc20 100644 --- a/tests/agents/durableagent/test_durable_agent.py +++ b/tests/agents/durableagent/test_durable_agent.py @@ -338,61 +338,6 @@ def test_tool_calling_workflow_initialization( assert instance_data.source is None assert instance_data.triggering_workflow_instance_id == "parent-instance-123" - @pytest.mark.asyncio - async def test_call_llm_activity(self, basic_durable_agent): - """Test that call_llm unwraps an LLMChatResponse properly.""" - - # create a fake LLMChatResponse with one choice - fake_response = LLMChatResponse( - results=[ - LLMChatCandidate( - message=AssistantMessage(content="Test response", tool_calls=[]), - finish_reason="stop", - ) - ], - metadata={}, - ) - basic_durable_agent.llm.generate = Mock(return_value=fake_response) - - instance_id = "test-instance-123" - # set up a minimal instance record - basic_durable_agent.state["instances"] = { - instance_id: { - "input": "Test task", - "source": "test_source", - "triggering_workflow_instance_id": None, - "workflow_instance_id": instance_id, - "workflow_name": "AgenticWorkflow", - "status": "RUNNING", - "messages": [], - "tool_history": [], - "end_time": None, - "trace_context": None, - } - } - - from datetime import datetime - - test_time = datetime.fromisoformat( - "2024-01-01T00:00:00Z".replace("Z", "+00:00") - ) - - # Mock the activity context - mock_ctx = Mock() - - assistant_dict = basic_durable_agent.call_llm( - mock_ctx, - { - "instance_id": instance_id, - "time": test_time.isoformat(), - "task": "Test task", - }, - ) - # The dict dumped from AssistantMessage should have our content - assert assistant_dict["content"] == "Test response" - assert assistant_dict["tool_calls"] == [] - basic_durable_agent.llm.generate.assert_called_once() - @pytest.mark.asyncio async def test_broadcast_message_to_agents_activity(self, basic_durable_agent): """Test broadcasting message to agents activity.""" diff --git a/tests/integration/quickstarts/conftest.py b/tests/integration/quickstarts/conftest.py index 8e4ce6974..fe05d6cb3 100644 --- a/tests/integration/quickstarts/conftest.py +++ b/tests/integration/quickstarts/conftest.py @@ -9,6 +9,8 @@ import re from pathlib import Path from typing import Optional, Dict, Any +import yaml +import tempfile import pytest logger = logging.getLogger(__name__) @@ -57,11 +59,8 @@ def setup_quickstart_venv(quickstart_dir: Path, project_root: Path) -> Path: # The venv's Python is typically a symlink, but we want to use it directly, not resolve it venv_python = venv_python.absolute() - # Skip installation if already done (for parallel execution) - installed_marker = venv_path / ".installed" - if installed_marker.exists(): - logger.info(f"Dependencies already installed for {quickstart_dir}") - else: + # Always run uv sync (it's fast when dependencies are already installed) + if True: # Set up environment to ensure uv uses the venv Python # Add venv's bin directory to PATH so uv can find the venv Python venv_bin = venv_path / "bin" @@ -85,7 +84,7 @@ def try_uv_install(description: str): """Try uv sync from workspace root""" # Run uv sync from workspace root to ensure all workspace dependencies are available result = subprocess.run( - ["uv", "sync", "--frozen"], + ["uv", "sync", "--frozen", "--all-extras"], cwd=project_root, env=install_env, capture_output=True, @@ -105,8 +104,24 @@ def try_uv_install(description: str): try_uv_install("requirements") - # Mark as installed so if we're running in parallel, we don't reinstall the dependencies. - installed_marker.touch() + # Also sync quickstart-specific dependencies if it has its own pyproject.toml + quickstart_pyproject = quickstart_dir / "pyproject.toml" + if quickstart_pyproject.exists(): + logger.info( + f"Syncing quickstart-specific dependencies from {quickstart_pyproject}" + ) + result = subprocess.run( + ["uv", "sync", "--frozen"], + cwd=quickstart_dir, + env=install_env, + capture_output=True, + text=True, + timeout=180, + ) + if result.returncode != 0: + raise RuntimeError( + f"Failed to install quickstart dependencies: {result.stderr}\n{result.stdout}" + ) return venv_python @@ -460,15 +475,38 @@ def run_quickstart_multi_app( venv_python, _ = _setup_venv_and_python(quickstart_dir, project_root, create_venv) - # Resolve environment variables in components if needed + # Resolve environment variables in components or resources + tmp_path = None resources_path = quickstart_dir / "components" if resources_path.exists(): - resources_path = _resolve_component_env_vars( + tmp_path = _resolve_component_env_vars( resources_path, cwd_path, venv_python, full_env ) + # Build modified YAML with tmp_path if needed + yaml_to_use = dapr_yaml_path + if tmp_path and tmp_path != resources_path: + with open(dapr_yaml_path, "r") as f: + yaml_data = yaml.safe_load(f) + if yaml_data and "common" in yaml_data: + yaml_data["common"]["resourcesPath"] = str(tmp_path) + # Update appDirPath for each app to use absolute path to quickstart_dir + # This ensures scripts are found even when YAML is in a temp directory + if yaml_data and "apps" in yaml_data: + for app in yaml_data["apps"]: + if "appDirPath" in app: + app_dir = app["appDirPath"] + # If relative path, resolve it relative to quickstart_dir + if not Path(app_dir).is_absolute(): + app["appDirPath"] = str((quickstart_dir / app_dir).resolve()) + temp_dir = tempfile.mkdtemp(prefix="rendered_yaml_") + temp_yaml_path = Path(temp_dir) / dapr_yaml_path.name + with open(temp_yaml_path, "w") as f: + yaml.dump(yaml_data, f) + yaml_to_use = temp_yaml_path + # Build dapr run -f command - cmd = ["dapr", "run", "-f", str(dapr_yaml_path)] + cmd = ["dapr", "run", "-f", str(yaml_to_use)] # If trigger_curl is provided, we need to run the process in the background, # wait for the server to be ready, send the curl request, then terminate @@ -841,25 +879,27 @@ def _stream_and_detect(pipe, log_func, prefix, lines_list): ) # Check if this is the orchestrator's main workflow - if ( - orchestrator_workflow_name - and workflow_name == orchestrator_workflow_name - ): - if not orchestrator_completion_detected: - orchestrator_completion_detected = True - orchestrator_failed = is_failed - completion_time = time.time() - if is_failed: - logger.error( - f"Orchestrator workflow '{orchestrator_workflow_name}' FAILED! " - f"This indicates a quickstart bug, not a test issue." - ) - else: - logger.info( - f"Orchestrator workflow '{orchestrator_workflow_name}' completed! " - f"Unique workflow types: {len(completed_workflows)}, " - f"Total completion instances: {total_completions}" - ) + # If orchestrator_workflow_name is None, treat the first completed workflow as the orchestrator + is_orchestrator = ( + orchestrator_workflow_name is None + or workflow_name == orchestrator_workflow_name + ) + + if is_orchestrator and not orchestrator_completion_detected: + orchestrator_completion_detected = True + orchestrator_failed = is_failed + completion_time = time.time() + if is_failed: + logger.error( + f"Orchestrator workflow '{workflow_name}' FAILED! " + f"This indicates a quickstart bug, not a test issue." + ) + else: + logger.info( + f"Orchestrator workflow '{workflow_name}' completed! " + f"Unique workflow types: {len(completed_workflows)}, " + f"Total completion instances: {total_completions}" + ) # Also check for [agent-runner] completion message as a fallback if ( @@ -1260,8 +1300,8 @@ def _run_with_pubsub_trigger( publish_cmd = [ "dapr", "publish", - "--dapr-http-port", - str(dapr_http_port), + "--publish-app-id", + "e2e-test-publisher", "--pubsub", pubsub_name, "--topic", diff --git a/tests/integration/quickstarts/test_01_dapr_agents_fundamentals.py b/tests/integration/quickstarts/test_01_dapr_agents_fundamentals.py index f5e7f721d..67098353a 100644 --- a/tests/integration/quickstarts/test_01_dapr_agents_fundamentals.py +++ b/tests/integration/quickstarts/test_01_dapr_agents_fundamentals.py @@ -1,7 +1,10 @@ """Integration tests for 01-dapr-agents-fundamentals quickstart.""" import pytest -from tests.integration.quickstarts.conftest import run_quickstart_script +from tests.integration.quickstarts.conftest import ( + run_quickstart_multi_app, + run_quickstart_script, +) @pytest.mark.integration @@ -28,7 +31,7 @@ def test_01_llm_client(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="llm-client", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( @@ -54,7 +57,7 @@ def test_02_agent_llm(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="agent-llm", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( @@ -80,7 +83,7 @@ def test_03_agent_llm_tools(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="agent-llm", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( @@ -100,7 +103,7 @@ def test_04_agent_mcp_tools(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="agent-mcp", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( @@ -124,7 +127,7 @@ def test_05_agent_memory(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="agent-memory", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( @@ -151,7 +154,7 @@ def test_06_durable_agent_http(self, dapr_runtime): # noqa: ARG002 use_dapr=True, app_id="durable-agent", app_port=8001, - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", trigger_curl={ "url": "http://localhost:8001/run", "method": "POST", @@ -183,7 +186,7 @@ def test_07_durable_agent_pubsub(self, dapr_runtime): # noqa: ARG002 use_dapr=True, app_id="durable-agent-sub", dapr_http_port=3500, - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", trigger_pubsub={ "pubsub_name": "message-pubsub", "topic": "weather.requests", @@ -213,7 +216,7 @@ def test_08_workflow_llm(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="workflow-llms", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( @@ -235,19 +238,17 @@ def test_09_workflow_agents(self, dapr_runtime): # noqa: ARG002 Note: dapr_runtime parameter ensures Dapr is initialized before this test runs. The fixture is needed for setup, even though we don't use the value directly. """ - script_path = self.quickstart_dir / "09_workflow_agents.py" - result = run_quickstart_script( - script_path, + dapr_yaml = self.quickstart_dir / "09_workflow_agents.yaml" + result = run_quickstart_multi_app( + dapr_yaml, cwd=self.quickstart_dir, env=self.env, timeout=180, - use_dapr=True, - app_id="workflow-agents", - resources_path=self.quickstart_dir / "resources", + stream_logs=True, ) assert result.returncode == 0, ( - f"Quickstart script '{script_path}' failed with return code {result.returncode}.\n" + f"Quickstart script '{dapr_yaml}' failed with return code {result.returncode}.\n" f"STDOUT:\n{result.stdout}\n" f"STDERR:\n{result.stderr}" ) @@ -269,7 +270,7 @@ def test_10_durable_agent_tracing(self, dapr_runtime): # noqa: ARG002 timeout=180, use_dapr=True, app_id="durable-agent-trace", - resources_path=self.quickstart_dir / "resources", + resources_path=self.quickstart_dir / "components", ) assert result.returncode == 0, ( diff --git a/tests/integration/quickstarts/test_03_durable_agent_tool_call.py b/tests/integration/quickstarts/test_03_durable_agent_tool_call.py index 073672f40..065dc26e4 100644 --- a/tests/integration/quickstarts/test_03_durable_agent_tool_call.py +++ b/tests/integration/quickstarts/test_03_durable_agent_tool_call.py @@ -128,30 +128,6 @@ def test_durable_weather_agent_openai(self, dapr_runtime): # noqa: ARG002 # expect some output assert len(result.stdout) > 0 or len(result.stderr) > 0 - def test_durable_weather_agent_tracing(self, dapr_runtime): # noqa: ARG002 - """Test durable weather agent tracing example (durable_weather_agent_tracing.py). - - Note: dapr_runtime parameter ensures Dapr is initialized before this test runs. - The fixture is needed for setup, even though we don't use the value directly. - """ - script = self.quickstart_dir / "durable_weather_agent_tracing.py" - result = run_quickstart_script( - script, - cwd=self.quickstart_dir, - env=self.env, - timeout=180, # Durable agents may take longer - use_dapr=True, - app_id="durableweatherapp", - ) - - assert result.returncode == 0, ( - f"Quickstart failed with return code {result.returncode}.\n" - f"STDOUT:\n{result.stdout}\n" - f"STDERR:\n{result.stderr}" - ) - # expect some output - assert len(result.stdout) > 0 or len(result.stderr) > 0 - def test_durable_weather_agent_serve(self, dapr_runtime): # noqa: ARG002 """Test durable weather agent serve example (durable_weather_agent_serve.py). diff --git a/tests/integration/quickstarts/test_04_agent_based_workflows.py b/tests/integration/quickstarts/test_04_agent_based_workflows.py index 52741ea82..964d150a0 100644 --- a/tests/integration/quickstarts/test_04_agent_based_workflows.py +++ b/tests/integration/quickstarts/test_04_agent_based_workflows.py @@ -1,7 +1,7 @@ """Integration tests for 04-agent-based-workflows quickstart.""" import pytest -from tests.integration.quickstarts.conftest import run_quickstart_script +from tests.integration.quickstarts.conftest import run_quickstart_multi_app @pytest.mark.integration @@ -20,14 +20,13 @@ def test_sequential_workflow(self, dapr_runtime): # noqa: ARG002 Note: dapr_runtime parameter ensures Dapr is initialized before this test runs. The fixture is needed for setup, even though we don't use the value directly. """ - script = self.quickstart_dir / "01_sequential_workflow.py" - result = run_quickstart_script( - script, + dapr_yaml = self.quickstart_dir / "sequential.yaml" + result = run_quickstart_multi_app( + dapr_yaml, cwd=self.quickstart_dir, env=self.env, timeout=180, # Agent workflows may take longer - use_dapr=True, - app_id="dapr-agent-planner", + stream_logs=True, ) assert result.returncode == 0, ( diff --git a/tests/integration/quickstarts/test_09_agent_observability.py b/tests/integration/quickstarts/test_09_agent_observability.py deleted file mode 100644 index 288d1e74f..000000000 --- a/tests/integration/quickstarts/test_09_agent_observability.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Integration tests for 09-agent-observability quickstart.""" - -import pytest -import time -import requests -from pathlib import Path -from tests.integration.quickstarts.conftest import run_quickstart_script - - -@pytest.mark.integration -class TestAgentObservabilityQuickstart: - """Integration tests for 09-agent-observability quickstart.""" - - @pytest.fixture(autouse=True) - def setup(self, quickstarts_dir, openai_api_key): - """Setup test environment.""" - self.quickstart_dir = quickstarts_dir / "09-agent-observability" - self.env = {"OPENAI_API_KEY": openai_api_key} - - def test_zipkin_tracing(self, zipkin_service): - """Test agent with Zipkin tracing (01_agent_zipkin.py).""" - # zipkin_service fixture already waits for service to be ready - # and returns service info including endpoints - - script = self.quickstart_dir / "01_agent_zipkin.py" - result = run_quickstart_script( - script, - cwd=self.quickstart_dir, - env=self.env, - timeout=60, - ) - - assert result.returncode == 0, ( - f"Quickstart failed with return code {result.returncode}.\n" - f"STDOUT:\n{result.stdout}\n" - f"STDERR:\n{result.stderr}" - ) - - # Give Zipkin time to receive spans - time.sleep(2) - - # Verify traces were sent to Zipkin - try: - # Query Zipkin for traces using the endpoint from fixture - response = requests.get(zipkin_service["api_endpoint"], timeout=5) - if response.status_code == 200: - traces = response.json() - # At least one trace should exist - assert isinstance(traces, list) - except requests.exceptions.RequestException: - raise - - def test_otel_tracing(self, jaeger_service): - """Test agent with OpenTelemetry tracing (02_agent_otel.py).""" - # jaeger_service fixture already waits for service to be ready - # and returns service info including endpoints - - script = self.quickstart_dir / "02_agent_otel.py" - result = run_quickstart_script( - script, - cwd=self.quickstart_dir, - env=self.env, - timeout=60, - ) - - # Check if script ran - assert result.returncode == 0, ( - f"Quickstart failed with return code {result.returncode}.\n" - f"STDOUT:\n{result.stdout}\n" - f"STDERR:\n{result.stderr}" - ) - - # Give Jaeger time to receive traces - time.sleep(2) - - # Verify traces were sent to Jaeger - try: - # Query Jaeger API for traces using the endpoint from fixture - response = requests.get( - f"{jaeger_service['endpoint']}/api/traces?service=dapr-weather-agents", - timeout=5, - ) - if response.status_code == 200: - data = response.json() - # Verify response structure - assert "data" in data or "traces" in data - except requests.exceptions.RequestException: - # If we can't verify, that's okay - the test still passed if script ran - pass diff --git a/tests/integration/quickstarts/test_09_agents_as_activities_observability b/tests/integration/quickstarts/test_09_agents_as_activities_observability.py similarity index 89% rename from tests/integration/quickstarts/test_09_agents_as_activities_observability rename to tests/integration/quickstarts/test_09_agents_as_activities_observability.py index 056f86e41..51bceca6a 100644 --- a/tests/integration/quickstarts/test_09_agents_as_activities_observability +++ b/tests/integration/quickstarts/test_09_agents_as_activities_observability.py @@ -1,6 +1,7 @@ """Integration tests for 09-agents-as-activities-observability quickstart.""" + import pytest -from tests.integration.quickstarts.conftest import run_quickstart_script +from tests.integration.quickstarts.conftest import run_quickstart_multi_app @pytest.mark.integration @@ -19,14 +20,13 @@ def test_sequential_workflow(self, dapr_runtime): # noqa: ARG002 Note: dapr_runtime parameter ensures Dapr is initialized before this test runs. The fixture is needed for setup, even though we don't use the value directly. """ - script = self.quickstart_dir / "sequential_workflow.py" - result = run_quickstart_script( - script, + dapr_yaml = self.quickstart_dir / "sequential.yaml" + result = run_quickstart_multi_app( + dapr_yaml, cwd=self.quickstart_dir, env=self.env, timeout=180, # Agent workflows may take longer - use_dapr=True, - app_id="dapr-agent-wf", + stream_logs=True, ) assert result.returncode == 0, ( diff --git a/tests/workflow/test_activity_decorators.py b/tests/workflow/test_activity_decorators.py deleted file mode 100644 index bfaec925b..000000000 --- a/tests/workflow/test_activity_decorators.py +++ /dev/null @@ -1,446 +0,0 @@ -import pytest -from unittest.mock import MagicMock, AsyncMock, patch -from typing import List -from pydantic import BaseModel, Field - -from dapr_agents.workflow.decorators.activities import llm_activity, agent_activity -from dapr_agents.llm.chat import ChatClientBase -from dapr_agents.agents.base import AgentBase -from dapr_agents.types import LLMChatResponse, AssistantMessage - - -# Test Models -class Person(BaseModel): - """Test model for structured responses.""" - - name: str = Field(..., description="Person's name") - age: int = Field(..., description="Person's age") - - -class QuestionList(BaseModel): - """Test model for list responses.""" - - questions: List[str] = Field(..., description="List of questions") - - -# Fixtures -@pytest.fixture -def mock_llm_client(): - """Mock LLM client that returns test responses.""" - mock_client = MagicMock(spec=ChatClientBase) - mock_client.generate = MagicMock(return_value="Test response from LLM") - return mock_client - - -@pytest.fixture -def mock_llm_client_async(): - """Mock async LLM client.""" - mock_client = MagicMock(spec=ChatClientBase) - mock_client.generate = AsyncMock(return_value="Async test response") - return mock_client - - -@pytest.fixture -def mock_llm_client_structured(): - """Mock LLM client that returns structured (LLMChatResponse) responses.""" - from dapr_agents.types import LLMChatCandidate - - mock_client = MagicMock(spec=ChatClientBase) - candidate = LLMChatCandidate( - message=AssistantMessage(content="Structured response"), - finish_reason="stop", - ) - response = LLMChatResponse( - results=[candidate], - metadata={"model": "test-model"}, - ) - mock_client.generate = MagicMock(return_value=response) - return mock_client - - -@pytest.fixture -def mock_agent(): - """Mock agent that returns test responses.""" - - class DummyAgent: - def __init__(self) -> None: - self.calls: list[str] = [] - - async def run(self, prompt: str) -> str: - self.calls.append(prompt) - return "Agent response" - - return DummyAgent() - - -@pytest.fixture -def mock_workflow_context(): - """Mock WorkflowActivityContext.""" - ctx = MagicMock() - ctx.instance_id = "test-instance-123" - return ctx - - -# Tests for llm_activity decorator -def test_llm_activity_requires_prompt(): - """Test that llm_activity raises ValueError when prompt is empty.""" - mock_llm = MagicMock(spec=ChatClientBase) - - with pytest.raises(ValueError, match="@llm_activity requires a prompt template"): - llm_activity(prompt="", llm=mock_llm) - - -def test_llm_activity_requires_llm(): - """Test that llm_activity raises ValueError when llm is None.""" - with pytest.raises( - ValueError, match="@llm_activity requires an explicit `llm` client instance" - ): - llm_activity(prompt="Test prompt", llm=None) - - -def test_llm_activity_decorator_basic(mock_llm_client, mock_workflow_context): - """Test basic llm_activity decoration and execution.""" - - @llm_activity(prompt="Say hello to {name}", llm=mock_llm_client) - def greet(ctx, name: str) -> str: - pass - - # Check wrapper metadata - assert hasattr(greet, "_is_llm_activity") - assert greet._is_llm_activity is True - assert hasattr(greet, "_llm_activity_config") - assert greet._llm_activity_config["prompt"] == "Say hello to {name}" - - # Execute the decorated function - result = greet(mock_workflow_context, payload={"name": "Alice"}) - - # Verify the result - assert result == "Test response from LLM" - mock_llm_client.generate.assert_called_once() - - -def test_llm_activity_positional_args(mock_llm_client, mock_workflow_context): - """Test llm_activity with positional arguments.""" - - @llm_activity(prompt="Process {text}", llm=mock_llm_client) - def process_text(ctx, text: str) -> str: - pass - - result = process_text(mock_workflow_context, {"text": "Hello World"}) - assert result == "Test response from LLM" - - -def test_llm_activity_keyword_args(mock_llm_client, mock_workflow_context): - """Test llm_activity with keyword arguments.""" - - @llm_activity(prompt="Summarize {content}", llm=mock_llm_client) - def summarize(ctx, content: str) -> str: - pass - - result = summarize(ctx=mock_workflow_context, payload={"content": "Long text"}) - assert result == "Test response from LLM" - - -def test_llm_activity_multiple_params(mock_llm_client, mock_workflow_context): - """Test llm_activity with multiple parameters.""" - - @llm_activity(prompt="Write a {length} story about {topic}", llm=mock_llm_client) - def write_story(ctx, topic: str, length: str) -> str: - pass - - result = write_story( - mock_workflow_context, payload={"topic": "robots", "length": "short"} - ) - assert result == "Test response from LLM" - - -def test_llm_activity_no_payload(mock_llm_client, mock_workflow_context): - """Test llm_activity with no input payload.""" - - @llm_activity(prompt="Generate a random fact", llm=mock_llm_client) - def random_fact(ctx) -> str: - pass - - result = random_fact(mock_workflow_context) - assert result == "Test response from LLM" - - -def test_llm_activity_async_llm(mock_llm_client_async, mock_workflow_context): - """Test llm_activity with async LLM client.""" - - @llm_activity(prompt="Test prompt", llm=mock_llm_client_async) - def async_test(ctx) -> str: - pass - - result = async_test(mock_workflow_context) - assert result == "Async test response" - - -def test_llm_activity_structured_response( - mock_llm_client_structured, mock_workflow_context -): - """Test llm_activity with LLMChatResponse (structured response).""" - - @llm_activity(prompt="Get info", llm=mock_llm_client_structured) - def get_info(ctx) -> str: - pass - - result = get_info(mock_workflow_context) - # convert_result should extract the content from LLMChatResponse - assert result == "Structured response" - - -def test_llm_activity_structured_mode(mock_llm_client, mock_workflow_context): - """Test llm_activity with different structured modes.""" - - @llm_activity( - prompt="Get data", llm=mock_llm_client, structured_mode="function_call" - ) - def get_data(ctx) -> str: - pass - - assert get_data._llm_activity_config["structured_mode"] == "function_call" - - -def test_llm_activity_preserves_function_metadata(mock_llm_client): - """Test that llm_activity preserves function name and docstring.""" - - @llm_activity(prompt="Test", llm=mock_llm_client) - def my_function(ctx, param: str) -> str: - """This is a docstring.""" - pass - - assert my_function.__name__ == "my_function" - assert my_function.__doc__ == "This is a docstring." - - -# Tests for agent_activity decorator - - -def test_agent_activity_requires_agent(): - """Test that agent_activity raises ValueError when agent is None.""" - with pytest.raises(ValueError, match="@agent_activity requires an AgentBase"): - agent_activity(agent=None) - - -def test_agent_activity_decorator_basic(mock_agent, mock_workflow_context): - """Test basic agent_activity decoration and execution.""" - - @agent_activity(agent=mock_agent) - def run_task(ctx, task: str) -> str: - pass - - # Check wrapper metadata - assert hasattr(run_task, "_is_agent_activity") - assert run_task._is_agent_activity is True - assert hasattr(run_task, "_agent_activity_config") - - # Execute the decorated function - result = run_task(mock_workflow_context, payload={"task": "analyze data"}) - - # Verify the result - assert result == "Agent response" - assert mock_agent.calls == ["analyze data"] - - -def test_agent_activity_with_prompt(mock_agent, mock_workflow_context): - """Test agent_activity with custom prompt template.""" - - @agent_activity(agent=mock_agent, prompt="Analyze {data} and provide insights") - def analyze(ctx, data: str) -> str: - pass - - result = analyze(mock_workflow_context, payload={"data": "sales numbers"}) - assert result == "Agent response" - - # Verify the agent was called with formatted prompt - assert mock_agent.calls == ["Analyze sales numbers and provide insights"] - - -def test_agent_activity_without_prompt(mock_agent, mock_workflow_context): - """Test agent_activity without prompt (uses format_agent_input).""" - - @agent_activity(agent=mock_agent) - def process(ctx, input_data: str) -> str: - pass - - result = process(mock_workflow_context, payload={"input_data": "test data"}) - assert result == "Agent response" - assert mock_agent.calls == ["test data"] - - -def test_agent_activity_multiple_params(mock_agent, mock_workflow_context): - """Test agent_activity with multiple parameters.""" - - @agent_activity(agent=mock_agent, prompt="Compare {item1} and {item2}") - def compare(ctx, item1: str, item2: str) -> str: - pass - - result = compare( - mock_workflow_context, payload={"item1": "apple", "item2": "orange"} - ) - assert result == "Agent response" - assert mock_agent.calls == ["Compare apple and orange"] - - -def test_agent_activity_preserves_function_metadata(mock_agent): - """Test that agent_activity preserves function name and docstring.""" - - @agent_activity(agent=mock_agent) - def agent_function(ctx, task: str) -> str: - """Agent function docstring.""" - pass - - assert agent_function.__name__ == "agent_function" - assert agent_function.__doc__ == "Agent function docstring." - - -# Integration tests - - -def test_llm_activity_with_pydantic_return_type(mock_llm_client, mock_workflow_context): - """Test llm_activity with Pydantic model return type annotation.""" - # Mock the LLM to return a dict that matches Person schema - mock_llm_client.generate = MagicMock(return_value='{"name": "John Doe", "age": 30}') - - @llm_activity(prompt="Get person info for {person_id}", llm=mock_llm_client) - def get_person(ctx, person_id: str) -> Person: - pass - - async def _return_person(*args, **kwargs): - return Person(name="John Doe", age=30) - - with patch( - "dapr_agents.workflow.decorators.activities.validate_result" - ) as mock_validate: - mock_validate.side_effect = _return_person - _ = get_person(mock_workflow_context, payload={"person_id": "123"}) - - # Verify the decorator called the LLM correctly - assert mock_llm_client.generate.called - - -def test_llm_activity_ctx_parameter_stripped(mock_llm_client, mock_workflow_context): - """Test that ctx parameter is properly stripped from signature processing.""" - - @llm_activity(prompt="Process {data}", llm=mock_llm_client) - def process(ctx, data: str) -> str: - pass - - # The decorator should handle ctx internally and not include it in prompt formatting - result = process(mock_workflow_context, payload={"data": "test"}) - assert result == "Test response from LLM" - - # Verify that the LLM was called with proper parameters (not including ctx) - call_args = mock_llm_client.generate.call_args - assert call_args is not None - - -def test_agent_activity_ctx_parameter_stripped(mock_agent, mock_workflow_context): - """Test that ctx parameter is properly stripped from signature processing.""" - - @agent_activity(agent=mock_agent, prompt="Process {input_val}") - def process(ctx, input_val: str) -> str: - pass - - result = process(mock_workflow_context, payload={"input_val": "data"}) - assert result == "Agent response" - assert mock_agent.calls == ["Process data"] - - -def test_llm_activity_scalar_input(mock_llm_client, mock_workflow_context): - """Test llm_activity with scalar (non-dict) input.""" - - @llm_activity(prompt="Analyze {text}", llm=mock_llm_client) - def analyze(ctx, text: str) -> str: - pass - - # Pass a scalar string instead of a dict - result = analyze(mock_workflow_context, "Simple text input") - assert result == "Test response from LLM" - - -def test_agent_activity_scalar_input(mock_agent, mock_workflow_context): - """Test agent_activity with scalar (non-dict) input.""" - - @agent_activity(agent=mock_agent) - def process(ctx, data: str) -> str: - pass - - result = process(mock_workflow_context, "scalar input") - assert result == "Agent response" - assert mock_agent.calls == ["scalar input"] - - -def test_llm_activity_with_task_kwargs(mock_llm_client, mock_workflow_context): - """Test llm_activity with additional task_kwargs.""" - - @llm_activity( - prompt="Test", - llm=mock_llm_client, - custom_param="custom_value", - another_param=123, - ) - def test_func(ctx) -> str: - pass - - assert ( - test_func._llm_activity_config["task_kwargs"]["custom_param"] == "custom_value" - ) - assert test_func._llm_activity_config["task_kwargs"]["another_param"] == 123 - - -def test_agent_activity_with_task_kwargs(mock_agent, mock_workflow_context): - """Test agent_activity with additional task_kwargs.""" - - @agent_activity(agent=mock_agent, custom_setting="value", timeout=300) - def test_func(ctx) -> str: - pass - - assert test_func._agent_activity_config["task_kwargs"]["custom_setting"] == "value" - assert test_func._agent_activity_config["task_kwargs"]["timeout"] == 300 - result = test_func(mock_workflow_context) - assert result == "Agent response" - assert len(mock_agent.calls) == 1 - - -# Error handling tests - - -def test_llm_activity_non_callable_raises_error(mock_llm_client): - """Test that decorating a non-callable raises ValueError.""" - decorator = llm_activity(prompt="Test", llm=mock_llm_client) - - with pytest.raises(ValueError, match="must decorate a callable activity"): - decorator("not a function") - - -def test_agent_activity_non_callable_raises_error(mock_agent): - """Test that decorating a non-callable raises ValueError.""" - decorator = agent_activity(agent=mock_agent) - - with pytest.raises(ValueError, match="must decorate a callable activity"): - decorator("not a function") - - -def test_llm_activity_missing_context_raises_error(mock_llm_client): - """Test that calling without context raises ValueError.""" - - @llm_activity(prompt="Test", llm=mock_llm_client) - def test_func(ctx) -> str: - pass - - # Call without any context - should raise error from extract_ctx_and_payload - with pytest.raises(ValueError, match="Workflow context is required"): - test_func() - - -def test_agent_activity_missing_context_raises_error(mock_agent): - """Test that calling without context raises ValueError.""" - - @agent_activity(agent=mock_agent) - def test_func(ctx) -> str: - pass - - with pytest.raises(ValueError, match="Workflow context is required"): - test_func() diff --git a/tests/workflow/test_message_router.py b/tests/workflow/test_message_router.py index ec049a3d4..b9c500ceb 100644 --- a/tests/workflow/test_message_router.py +++ b/tests/workflow/test_message_router.py @@ -5,7 +5,7 @@ from unittest.mock import MagicMock from pydantic import BaseModel, Field -from dapr_agents.workflow.decorators.routers import message_router +from dapr_agents.workflow.decorators.decorators import message_router from dapr_agents.workflow.utils.routers import ( extract_message_models, extract_cloudevent_data, diff --git a/uv.lock b/uv.lock index af9a05762..4a0504edd 100644 --- a/uv.lock +++ b/uv.lock @@ -36,7 +36,6 @@ members = [ "07-agent-mcp-client-stdio", "07-agent-mcp-client-streamablehttp", "08-data-agent-mcp-chainlit", - "09-agent-observability", "09-agents-as-activities-observability", "dapr-agents", ] @@ -343,34 +342,17 @@ requires-dist = [ { name = "websockets", specifier = ">=12.0" }, ] -[[package]] -name = "09-agent-observability" -version = "0.1.0" -source = { virtual = "quickstarts/09-agent-observability" } -dependencies = [ - { name = "dapr-agents" }, - { name = "python-dotenv" }, -] - -[package.metadata] -requires-dist = [ - { name = "dapr-agents", editable = "." }, - { name = "python-dotenv", specifier = ">=1.2.1" }, -] - [[package]] name = "09-agents-as-activities-observability" version = "0.1.0" source = { virtual = "quickstarts/09-agents-as-activities-observability" } dependencies = [ - { name = "arize-phoenix" }, { name = "dapr-agents" }, { name = "python-dotenv" }, ] [package.metadata] requires-dist = [ - { name = "arize-phoenix", specifier = ">=12.16.0" }, { name = "dapr-agents", editable = "." }, { name = "python-dotenv", specifier = ">=1.2.1" }, ]