diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index 93fc5e991c..b6a49ecab8 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -37,7 +37,7 @@ """ # noqa: E501 import logging -from collections.abc import Sequence +from collections.abc import Callable, Sequence from typing import Any from agent_framework import AgentProtocol, ChatMessage @@ -72,11 +72,7 @@ async def from_message(self, message: ChatMessage, ctx: WorkflowContext[list[Cha await ctx.send_message(normalize_messages_input(message)) @handler - async def from_messages( - self, - messages: list[str | ChatMessage], - ctx: WorkflowContext[list[ChatMessage]], - ) -> None: + async def from_messages(self, messages: list[str | ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: await ctx.send_message(normalize_messages_input(messages)) @@ -102,7 +98,10 @@ async def end(self, conversation: list[ChatMessage], ctx: WorkflowContext[Any, l class SequentialBuilder: r"""High-level builder for sequential agent/executor workflows with shared context. - - `participants([...])` accepts a list of AgentProtocol (recommended) or Executor + - `participants([...])` accepts a list of AgentProtocol (recommended) or Executor instances + - `register_participants([...])` accepts a list of factories for AgentProtocol (recommended) + or Executor factories + - Executors must define a handler that consumes list[ChatMessage] and sends out a list[ChatMessage] - The workflow wires participants in order, passing a list[ChatMessage] down the chain - Agents append their assistant messages to the conversation - Custom executors can transform/summarize and return a list[ChatMessage] @@ -114,8 +113,14 @@ class SequentialBuilder: from agent_framework import SequentialBuilder + # With agent instances workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build() + # With agent factories + workflow = ( + SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build() + ) + # Enable checkpoint persistence workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build() @@ -133,16 +138,38 @@ class SequentialBuilder: def __init__(self) -> None: self._participants: list[AgentProtocol | Executor] = [] + self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] self._checkpoint_storage: CheckpointStorage | None = None self._request_info_enabled: bool = False self._request_info_filter: set[str] | None = None + def register_participants( + self, + participant_factories: Sequence[Callable[[], AgentProtocol | Executor]], + ) -> "SequentialBuilder": + """Register participant factories for this sequential workflow.""" + if self._participants: + raise ValueError( + "Cannot mix .participants([...]) and .register_participants() in the same builder instance." + ) + + if not participant_factories: + raise ValueError("participant_factories cannot be empty") + + self._participant_factories = list(participant_factories) + return self + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "SequentialBuilder": """Define the ordered participants for this sequential workflow. Accepts AgentProtocol instances (auto-wrapped as AgentExecutor) or Executor instances. Raises if empty or duplicates are provided for clarity. """ + if self._participant_factories: + raise ValueError( + "Cannot mix .participants([...]) and .register_participants() in the same builder instance." + ) + if not participants: raise ValueError("participants cannot be empty") @@ -217,13 +244,22 @@ def build(self) -> Workflow: - _InputToConversation normalizes the initial input into list[ChatMessage] - For each participant in order: - If Agent (or AgentExecutor): pass conversation to the agent, then optionally - route through human input interceptor, then convert response to conversation + route through a request info interceptor, then convert response to conversation via _ResponseToConversation - Else (custom Executor): pass conversation directly to the executor - _EndWithConversation yields the final conversation and the workflow becomes idle """ - if not self._participants: - raise ValueError("No participants provided. Call .participants([...]) first.") + if not self._participants and not self._participant_factories: + raise ValueError( + "No participants or participant factories provided to the builder. " + "Use .participants([...]) or .ss([...])." + ) + + if self._participants and self._participant_factories: + # Defensive strategy: this should never happen due to checks in respective methods + raise ValueError( + "Cannot mix .participants([...]) and .register_participants() in the same builder instance." + ) # Internal nodes input_conv = _InputToConversation(id="input-conversation") @@ -235,13 +271,17 @@ def build(self) -> Workflow: # Start of the chain is the input normalizer prior: Executor | AgentProtocol = input_conv - for p in self._participants: - # Agent-like branch: either explicitly an AgentExecutor or any non-AgentExecutor - if not (isinstance(p, Executor) and not isinstance(p, AgentExecutor)): - # input conversation -> [human_input_interceptor] -> (agent) -> response -> conversation - label: str - label = p.id if isinstance(p, Executor) else getattr(p, "name", None) or p.__class__.__name__ - resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}") + participants: list[Executor | AgentProtocol] = [] + if self._participant_factories: + for factory in self._participant_factories: + p = factory() + participants.append(p) + else: + participants = self._participants + + for p in participants: + if isinstance(p, (AgentProtocol, AgentExecutor)): + label = p.id if isinstance(p, AgentExecutor) else p.display_name if self._request_info_enabled: # Insert request info interceptor BEFORE the agent @@ -254,13 +294,15 @@ def build(self) -> Workflow: else: builder.add_edge(prior, p) + resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}") builder.add_edge(p, resp_to_conv) prior = resp_to_conv elif isinstance(p, Executor): # Custom executor operates on list[ChatMessage] + # If the executor doesn't handle list[ChatMessage] correctly, validation will fail builder.add_edge(prior, p) prior = p - else: # pragma: no cover - defensive + else: raise TypeError(f"Unsupported participant type: {type(p).__name__}") # Terminate with the final conversation diff --git a/python/packages/core/agent_framework/_workflows/_typing_utils.py b/python/packages/core/agent_framework/_workflows/_typing_utils.py index 2c57f69b84..5619fb9bf3 100644 --- a/python/packages/core/agent_framework/_workflows/_typing_utils.py +++ b/python/packages/core/agent_framework/_workflows/_typing_utils.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. import logging -from dataclasses import fields, is_dataclass from types import UnionType from typing import Any, TypeVar, Union, cast, get_args, get_origin @@ -10,67 +9,6 @@ T = TypeVar("T") -def _coerce_to_type(value: Any, target_type: type[T]) -> T | None: - """Best-effort conversion of value into target_type. - - Args: - value: The value to convert (can be dict, dataclass, or object with __dict__) - target_type: The target type to convert to - - Returns: - Instance of target_type if conversion succeeds, None otherwise - """ - if isinstance(value, target_type): - return value # type: ignore[return-value] - - # Convert dataclass instances or objects with __dict__ into dict first - value_as_dict: dict[str, Any] - if not isinstance(value, dict): - if is_dataclass(value): - value_as_dict = {f.name: getattr(value, f.name) for f in fields(value)} - else: - value_dict = getattr(value, "__dict__", None) - if isinstance(value_dict, dict): - value_as_dict = cast(dict[str, Any], value_dict) - else: - return None - else: - value_as_dict = cast(dict[str, Any], value) - - # Try to construct the target type from the dict - ctor_kwargs: dict[str, Any] = dict(value_as_dict) - - if is_dataclass(target_type): - field_names = {f.name for f in fields(target_type)} - ctor_kwargs = {k: v for k, v in value_as_dict.items() if k in field_names} - - try: - return target_type(**ctor_kwargs) # type: ignore[call-arg,return-value] - except TypeError as exc: - logger.debug(f"_coerce_to_type could not call {target_type.__name__}(**..): {exc}") - except Exception as exc: # pragma: no cover - unexpected constructor failure - logger.warning( - f"_coerce_to_type encountered unexpected error calling {target_type.__name__} constructor: {exc}" - ) - - # Fallback: try to create instance without __init__ and set attributes - try: - instance = object.__new__(target_type) - except Exception as exc: # pragma: no cover - pathological type - logger.debug(f"_coerce_to_type could not allocate {target_type.__name__} without __init__: {exc}") - return None - - for key, val in value_as_dict.items(): - try: - setattr(instance, key, val) - except Exception as exc: - logger.debug( - f"_coerce_to_type could not set {target_type.__name__}.{key} during fallback assignment: {exc}" - ) - continue - return instance # type: ignore[return-value] - - def is_instance_of(data: Any, target_type: type | UnionType | Any) -> bool: """Check if the data is an instance of the target type. diff --git a/python/packages/core/tests/workflow/test_sequential.py b/python/packages/core/tests/workflow/test_sequential.py index 165d764725..8ff0098c38 100644 --- a/python/packages/core/tests/workflow/test_sequential.py +++ b/python/packages/core/tests/workflow/test_sequential.py @@ -15,6 +15,7 @@ Role, SequentialBuilder, TextContent, + TypeCompatibilityError, WorkflowContext, WorkflowOutputEvent, WorkflowRunState, @@ -58,11 +59,43 @@ async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[ await ctx.send_message(list(conversation) + [summary]) +class _InvalidExecutor(Executor): + """Invalid executor that does not have a handler that accepts a list of chat messages""" + + @handler + async def summarize(self, conversation: list[str], ctx: WorkflowContext[list[ChatMessage]]) -> None: + pass + + def test_sequential_builder_rejects_empty_participants() -> None: with pytest.raises(ValueError): SequentialBuilder().participants([]) +def test_sequential_builder_rejects_empty_participant_factories() -> None: + with pytest.raises(ValueError): + SequentialBuilder().register_participants([]) + + +def test_sequential_builder_rejects_mixing_participants_and_factories() -> None: + """Test that mixing .participants() and .register_participants() raises an error.""" + a1 = _EchoAgent(id="agent1", name="A1") + + # Try .participants() then .register_participants() + with pytest.raises(ValueError, match="Cannot mix"): + SequentialBuilder().participants([a1]).register_participants([lambda: _EchoAgent(id="agent2", name="A2")]) + + # Try .register_participants() then .participants() + with pytest.raises(ValueError, match="Cannot mix"): + SequentialBuilder().register_participants([lambda: _EchoAgent(id="agent1", name="A1")]).participants([a1]) + + +def test_sequential_builder_validation_rejects_invalid_executor() -> None: + """Test that adding an invalid executor to the builder raises an error.""" + with pytest.raises(TypeCompatibilityError): + SequentialBuilder().participants([_EchoAgent(id="agent1", name="A1"), _InvalidExecutor(id="invalid")]).build() + + async def test_sequential_agents_append_to_context() -> None: a1 = _EchoAgent(id="agent1", name="A1") a2 = _EchoAgent(id="agent2", name="A2") @@ -91,6 +124,37 @@ async def test_sequential_agents_append_to_context() -> None: assert "A2 reply" in msgs[2].text +async def test_sequential_register_participants_with_agent_factories() -> None: + """Test that register_participants works with agent factories.""" + + def create_agent1() -> _EchoAgent: + return _EchoAgent(id="agent1", name="A1") + + def create_agent2() -> _EchoAgent: + return _EchoAgent(id="agent2", name="A2") + + wf = SequentialBuilder().register_participants([create_agent1, create_agent2]).build() + + completed = False + output: list[ChatMessage] | None = None + async for ev in wf.run_stream("hello factories"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = ev.data + if completed and output is not None: + break + + assert completed + assert output is not None + assert isinstance(output, list) + msgs: list[ChatMessage] = output + assert len(msgs) == 3 + assert msgs[0].role == Role.USER and "hello factories" in msgs[0].text + assert msgs[1].role == Role.ASSISTANT and "A1 reply" in msgs[1].text + assert msgs[2].role == Role.ASSISTANT and "A2 reply" in msgs[2].text + + async def test_sequential_with_custom_executor_summary() -> None: a1 = _EchoAgent(id="agent1", name="A1") summarizer = _SummarizerExec(id="summarizer") @@ -103,7 +167,7 @@ async def test_sequential_with_custom_executor_summary() -> None: if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True elif isinstance(ev, WorkflowOutputEvent): - output = ev.data # type: ignore[assignment] + output = ev.data if completed and output is not None: break @@ -117,6 +181,37 @@ async def test_sequential_with_custom_executor_summary() -> None: assert msgs[2].role == Role.ASSISTANT and msgs[2].text.startswith("Summary of users:") +async def test_sequential_register_participants_mixed_agents_and_executors() -> None: + """Test register_participants with both agent and executor factories.""" + + def create_agent() -> _EchoAgent: + return _EchoAgent(id="agent1", name="A1") + + def create_summarizer() -> _SummarizerExec: + return _SummarizerExec(id="summarizer") + + wf = SequentialBuilder().register_participants([create_agent, create_summarizer]).build() + + completed = False + output: list[ChatMessage] | None = None + async for ev in wf.run_stream("topic Y"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = ev.data + if completed and output is not None: + break + + assert completed + assert output is not None + msgs: list[ChatMessage] = output + # Expect: [user, A1 reply, summary] + assert len(msgs) == 3 + assert msgs[0].role == Role.USER and "topic Y" in msgs[0].text + assert msgs[1].role == Role.ASSISTANT and "A1 reply" in msgs[1].text + assert msgs[2].role == Role.ASSISTANT and msgs[2].text.startswith("Summary of users:") + + async def test_sequential_checkpoint_resume_round_trip() -> None: storage = InMemoryCheckpointStorage() @@ -229,3 +324,130 @@ async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" + + +async def test_sequential_register_participants_with_checkpointing() -> None: + """Test that checkpointing works with register_participants.""" + storage = InMemoryCheckpointStorage() + + def create_agent1() -> _EchoAgent: + return _EchoAgent(id="agent1", name="A1") + + def create_agent2() -> _EchoAgent: + return _EchoAgent(id="agent2", name="A2") + + wf = SequentialBuilder().register_participants([create_agent1, create_agent2]).with_checkpointing(storage).build() + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("checkpoint with factories"): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + break + + assert baseline_output is not None + + checkpoints = await storage.list_checkpoints() + assert checkpoints + checkpoints.sort(key=lambda cp: cp.timestamp) + + resume_checkpoint = next( + (cp for cp in checkpoints if (cp.metadata or {}).get("checkpoint_type") == "superstep"), + checkpoints[-1], + ) + + wf_resume = ( + SequentialBuilder().register_participants([create_agent1, create_agent2]).with_checkpointing(storage).build() + ) + + resumed_output: list[ChatMessage] | None = None + async for ev in wf_resume.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id): + if isinstance(ev, WorkflowOutputEvent): + resumed_output = ev.data + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert resumed_output is not None + assert [m.role for m in resumed_output] == [m.role for m in baseline_output] + assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + + +async def test_sequential_register_participants_factories_called_on_build() -> None: + """Test that factories are called during build(), not during register_participants().""" + call_count = 0 + + def create_agent() -> _EchoAgent: + nonlocal call_count + call_count += 1 + return _EchoAgent(id=f"agent{call_count}", name=f"A{call_count}") + + builder = SequentialBuilder().register_participants([create_agent, create_agent]) + + # Factories should not be called yet + assert call_count == 0 + + wf = builder.build() + + # Now factories should have been called + assert call_count == 2 + + # Run the workflow to ensure it works + completed = False + output: list[ChatMessage] | None = None + async for ev in wf.run_stream("test factories timing"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = ev.data # type: ignore[assignment] + if completed and output is not None: + break + + assert completed + assert output is not None + msgs: list[ChatMessage] = output + # Should have user message + 2 agent replies + assert len(msgs) == 3 + + +async def test_sequential_builder_reusable_after_build_with_participants() -> None: + """Test that the builder can be reused to build multiple identical workflows with participants().""" + a1 = _EchoAgent(id="agent1", name="A1") + a2 = _EchoAgent(id="agent2", name="A2") + + builder = SequentialBuilder().participants([a1, a2]) + + # Build first workflow + builder.build() + + assert builder._participants[0] is a1 # type: ignore + assert builder._participants[1] is a2 # type: ignore + assert builder._participant_factories == [] # type: ignore + + +async def test_sequential_builder_reusable_after_build_with_factories() -> None: + """Test that the builder can be reused to build multiple workflows with register_participants().""" + call_count = 0 + + def create_agent1() -> _EchoAgent: + nonlocal call_count + call_count += 1 + return _EchoAgent(id="agent1", name="A1") + + def create_agent2() -> _EchoAgent: + nonlocal call_count + call_count += 1 + return _EchoAgent(id="agent2", name="A2") + + builder = SequentialBuilder().register_participants([create_agent1, create_agent2]) + + # Build first workflow - factories should be called + builder.build() + + assert call_count == 2 + assert builder._participants == [] # type: ignore + assert len(builder._participant_factories) == 2 # type: ignore + assert builder._participant_factories[0] is create_agent1 # type: ignore + assert builder._participant_factories[1] is create_agent2 # type: ignore diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index e1e18eab91..8f193eef72 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -124,6 +124,7 @@ For additional observability samples in Agent Framework, see the [observability | Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints | | Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context | | Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary | +| Sequential Orchestration (Participant Factories) | [orchestration/sequential_participant_factory.py](./orchestration/sequential_participant_factory.py) | Use participant factories for state isolation between workflow instances | **Magentic checkpointing tip**: Treat `MagenticBuilder.participants` keys as stable identifiers. When resuming from a checkpoint, the rebuilt workflow must reuse the same participant names; otherwise the checkpoint cannot be applied and the run will fail fast. diff --git a/python/samples/getting_started/workflows/orchestration/sequential_custom_executors.py b/python/samples/getting_started/workflows/orchestration/sequential_custom_executors.py index ec203ffb4c..104a833603 100644 --- a/python/samples/getting_started/workflows/orchestration/sequential_custom_executors.py +++ b/python/samples/getting_started/workflows/orchestration/sequential_custom_executors.py @@ -13,7 +13,6 @@ ) from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential -from typing_extensions import Never """ Sample: Sequential workflow mixing agents and a custom summarizer executor @@ -42,12 +41,12 @@ class Summarizer(Executor): """Simple summarizer: consumes full conversation and appends an assistant summary.""" @handler - async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[Never, list[ChatMessage]]) -> None: + async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: users = sum(1 for m in conversation if m.role == Role.USER) assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT) summary = ChatMessage(role=Role.ASSISTANT, text=f"Summary -> users:{users} assistants:{assistants}") final_conversation = list(conversation) + [summary] - await ctx.yield_output(final_conversation) + await ctx.send_message(final_conversation) async def main() -> None: diff --git a/python/samples/getting_started/workflows/orchestration/sequential_participant_factory.py b/python/samples/getting_started/workflows/orchestration/sequential_participant_factory.py new file mode 100644 index 0000000000..1073173413 --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/sequential_participant_factory.py @@ -0,0 +1,127 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio + +from agent_framework import ( + ChatAgent, + ChatMessage, + Executor, + Role, + SequentialBuilder, + Workflow, + WorkflowContext, + handler, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +""" +Sample: Sequential workflow with participant factories + +This sample demonstrates how to create a sequential workflow with participant factories. + +Using participant factories allows you to set up proper state isolation between workflow +instances created by the same builder. This is particularly useful when you need to handle +requests or tasks in parallel with stateful participants. + +In this example, we create a sequential workflow with two participants: an accumulator +and a content producer. The accumulator is stateful and maintains a list of all messages it has +received. Context is maintained across runs of the same workflow instance but not across different +workflow instances. +""" + + +class Accumulate(Executor): + """Simple accumulator. + + Accumulates all messages from the conversation and prints them out. + """ + + def __init__(self, id: str): + super().__init__(id) + # Some internal state to accumulate messages + self._accumulated: list[str] = [] + + @handler + async def accumulate(self, conversation: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: + self._accumulated.extend([msg.text for msg in conversation]) + print(f"Number of queries received so far: {len(self._accumulated)}") + await ctx.send_message(conversation) + + +def create_agent() -> ChatAgent: + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions="Produce a concise paragraph answering the user's request.", + name="ContentProducer", + ) + + +async def run_workflow(workflow: Workflow, query: str) -> None: + events = await workflow.run(query) + outputs = events.get_outputs() + + if outputs: + messages: list[ChatMessage] = outputs[0] + for message in messages: + name = message.author_name or ("assistant" if message.role == Role.ASSISTANT else "user") + print(f"{name}: {message.text}") + else: + raise RuntimeError("No outputs received from the workflow.") + + +async def main() -> None: + # 1) Create a builder with participant factories + builder = SequentialBuilder().register_participants([ + lambda: Accumulate("accumulator"), + create_agent, + ]) + # 2) Build workflow_a + workflow_a = builder.build() + + # 3) Run workflow_a + # Context is maintained across runs + print("=== First Run on workflow_a ===") + await run_workflow(workflow_a, "Why is the sky blue?") + print("\n=== Second Run on workflow_a ===") + await run_workflow(workflow_a, "Repeat my previous question.") + + # 4) Build workflow_b + # This will create a new instance of the accumulator and content producer + # using the same workflow builder + workflow_b = builder.build() + + # 5) Run workflow_b + # Context is not maintained across instances + print("\n=== First Run on workflow_b ===") + await run_workflow(workflow_b, "Repeat my previous question.") + + """ + Sample Output: + + === First Run on workflow_a === + Number of queries received so far: 1 + user: Why is the sky blue? + ContentProducer: The sky appears blue due to a phenomenon called Rayleigh scattering. + When sunlight enters the Earth's atmosphere, it collides with gases + and particles, scattering shorter wavelengths of light (blue and violet) + more than the longer wavelengths (red and yellow). Although violet light + is scattered even more than blue, our eyes are more sensitive to blue + light, and some violet light is absorbed by the ozone layer. As a result, + we perceive the sky as predominantly blue during the day. + + === Second Run on workflow_a === + Number of queries received so far: 2 + user: Repeat my previous question. + ContentProducer: Why is the sky blue? + + === First Run on workflow_b === + Number of queries received so far: 1 + user: Repeat my previous question. + ContentProducer: I'm sorry, but I can't repeat your previous question as I don't have + access to your past queries. However, feel free to ask anything again, + and I'll be happy to help! + """ + + +if __name__ == "__main__": + asyncio.run(main())